'use strict'; const amqplib = require('amqplib'); function RabbitMQPrx(conf) { this.param = { protocol: 'amqp', hostname: conf.host, port: conf.port, username: conf.user, password: conf.passwd, frameMax: 0, heartbeat: 0, vhost: conf.vhost }; this.amqp = amqplib.connect(this.param); this.produces = {}; this.consumes = {}; } RabbitMQPrx.prototype.initProduce = async function(excName, excType, excArgs = {}) { let produce = {}; produce.channel = await this.amqp.then(function (conn) { return conn.createChannel().then( function(channel) { return channel.assertExchange(excName, excType, excArgs).then(function(){ return channel; }); }); }); this.produces[excName] = produce; }; RabbitMQPrx.prototype.initConsume = async function(excName, excType, excArgs, queueName, queueArgs, routeKey, cb) { let consumer = {}; consumer.channel = await this.amqp.then(function (conn) { return conn.createChannel().then( function(channel) { let ret = channel.assertExchange(excName, excType, excArgs); ret = ret.then(function () { return channel.assertQueue(queueName, queueArgs); }); ret = ret.then(function(qok) { for (let n in routeKey) { if (routeKey.hasOwnProperty(n)) channel.bindQueue(qok.queue, excName, routeKey[n]); } return qok.queue; }); ret = ret.then(function(queue) { return channel.consume(queue, cb, {noAck: true}); }); return ret.then(function() { return channel; }); }); }); this.consumes[excName] = consumer; }; RabbitMQPrx.prototype.publish = function(excName, routeKey, msg) { this.produces[excName].channel.publish(excName, routeKey, Buffer.from(msg)); }; module.exports = RabbitMQPrx;