npm-package/utils/impl/prxrmq.js

68 lines
1.8 KiB
JavaScript
Raw Permalink Normal View History

2019-08-28 15:44:52 +08:00
'use strict';
const amqplib = require('amqplib');
function RabbitMQPrx(conf) {
this.param = {
protocol: 'amqp',
hostname: conf.host,
port: conf.port,
username: conf.user,
password: conf.passwd,
frameMax: 0,
heartbeat: 0,
vhost: conf.vhost
};
this.amqp = amqplib.connect(this.param);
this.produces = {};
this.consumes = {};
}
RabbitMQPrx.prototype.initProduce = async function(excName, excType, excArgs = {}) {
let produce = {};
produce.channel = await this.amqp.then(function (conn) {
return conn.createChannel().then( function(channel) {
return channel.assertExchange(excName, excType, excArgs).then(function(){
return channel;
});
});
});
this.produces[excName] = produce;
};
RabbitMQPrx.prototype.initConsume = async function(excName, excType, excArgs, queueName, queueArgs, routeKey, cb) {
let consumer = {};
consumer.channel = await this.amqp.then(function (conn) {
return conn.createChannel().then( function(channel) {
let ret = channel.assertExchange(excName, excType, excArgs);
ret = ret.then(function () {
return channel.assertQueue(queueName, queueArgs);
});
ret = ret.then(function(qok) {
for (let n in routeKey) {
if (routeKey.hasOwnProperty(n))
channel.bindQueue(qok.queue, excName, routeKey[n]);
}
return qok.queue;
});
ret = ret.then(function(queue) {
return channel.consume(queue, cb, {noAck: true});
});
return ret.then(function() {
return channel;
});
});
});
this.consumes[excName] = consumer;
};
RabbitMQPrx.prototype.publish = function(excName, routeKey, msg) {
this.produces[excName].channel.publish(excName, routeKey, Buffer.from(msg));
};
module.exports = RabbitMQPrx;