68 lines
1.8 KiB
JavaScript
68 lines
1.8 KiB
JavaScript
'use strict';
|
|
|
|
const amqplib = require('amqplib');
|
|
|
|
function RabbitMQPrx(conf) {
|
|
this.param = {
|
|
protocol: 'amqp',
|
|
hostname: conf.host,
|
|
port: conf.port,
|
|
username: conf.user,
|
|
password: conf.passwd,
|
|
frameMax: 0,
|
|
heartbeat: 0,
|
|
vhost: conf.vhost
|
|
};
|
|
|
|
this.amqp = amqplib.connect(this.param);
|
|
|
|
this.produces = {};
|
|
this.consumes = {};
|
|
}
|
|
|
|
RabbitMQPrx.prototype.initProduce = async function(excName, excType, excArgs = {}) {
|
|
let produce = {};
|
|
produce.channel = await this.amqp.then(function (conn) {
|
|
return conn.createChannel().then( function(channel) {
|
|
return channel.assertExchange(excName, excType, excArgs).then(function(){
|
|
return channel;
|
|
});
|
|
});
|
|
});
|
|
this.produces[excName] = produce;
|
|
};
|
|
|
|
RabbitMQPrx.prototype.initConsume = async function(excName, excType, excArgs, queueName, queueArgs, routeKey, cb) {
|
|
let consumer = {};
|
|
consumer.channel = await this.amqp.then(function (conn) {
|
|
return conn.createChannel().then( function(channel) {
|
|
let ret = channel.assertExchange(excName, excType, excArgs);
|
|
ret = ret.then(function () {
|
|
return channel.assertQueue(queueName, queueArgs);
|
|
});
|
|
ret = ret.then(function(qok) {
|
|
for (let n in routeKey) {
|
|
if (routeKey.hasOwnProperty(n))
|
|
channel.bindQueue(qok.queue, excName, routeKey[n]);
|
|
}
|
|
return qok.queue;
|
|
});
|
|
|
|
ret = ret.then(function(queue) {
|
|
return channel.consume(queue, cb, {noAck: true});
|
|
});
|
|
|
|
return ret.then(function() {
|
|
return channel;
|
|
});
|
|
});
|
|
});
|
|
this.consumes[excName] = consumer;
|
|
};
|
|
|
|
RabbitMQPrx.prototype.publish = function(excName, routeKey, msg) {
|
|
this.produces[excName].channel.publish(excName, routeKey, Buffer.from(msg));
|
|
};
|
|
|
|
module.exports = RabbitMQPrx;
|