Skip to content

RabbitMQ

消息队列服务RabbitMQ笔记

Node.js

安装

RabbitMQ是AMQP协议的实现者,所以可以直接使用amqp协议的客户端。

安装amqp node客户端:

shell
npm install amqplib

普通的工作队列

竞争消费模式

所有的消费者共享一个队列,消息队列服务会自动分发消息给空闲的消费者。

实例代码:

producer 生产者:

js
import amqplib from 'amqplib'
// 建立连接
const connection = await amqplib.connect('amqp://user:pass@<ip>:<port>')
// 创建频道
const channel = await connection.createChannel()
// 设置队列名
const queueName = 'my_queue'
// 声明一个队列供我们传输用
// 选项durable是持久化
channel.assertQueue(queueName, { durable: false })
// 消息二进制传输
channel.sendToQueue(queueName, Buffer.from('Hello World!'))

consumer 消费者:

js
import amqplib from 'amqplib'
// 建立连接
const connection = await amqplib.connect('amqp://user:pass@<ip>:<port>')
// 创建频道
const channel = await connection.createChannel()
// 设置队列名
const queueName = 'my_queue'
// 声明要使用的队列, 消费者消费前声明队列是为了保证队列存在
await channel.assertQueue(queueName, { durable: false })
console.log(' [*] Waiting for messages in %s.', queueName)
channel.consume(queueName, (msg) => {
  console.log(' [x] Received %s', msg.content.toString())
  // 手动确认
  // channel.ack(msg)
}, { noAck: true })
// noAck配置为true代表在消息处理完后会自动确认消息。

发布订阅

RabbitMQ中消息传递模型是生产者通过交换机去推送消息,生产者甚至可以不知道具体推送到哪个消息队列了,只需要推送到指定的交换机,提供一个路由键,让交换机去通过路由键来推送到消息队列。(类似于路由表)

publisher -> exchange -bind-> queue <- consumer

消息队列要接受消息,需要绑定到交换机上,提供一个路由键,让交换机知道怎么联系到这个消息队列。一个路由键可以绑定多个消息队列,同一个路由键的消息队列平等的收到交换机推送来的消息。

交换机会平等的推送路由键对应的消息队列中。这就可以一对多了。

交换机一共有四个模式:

  • direct:路由键完全匹配
  • fanout:路由键忽略,直接推送到所有绑定的消息队列
  • topic:路由键模糊匹配
  • headers:忽略路由键,根据消息的headers属性匹配

示例代码

producer 生产者

js
import amqplib from 'amqplib'

// 连接rabbitmq服务
const connection = await amqplib.connect('amqp://user:pass@<ip>:<port>')
// 创建频道
const channel = await connection.createChannel()

const exchangeName = 'my_test_exchange'

// 声明交换机exchange, 没有就创建,有就无事发生
/**
 * @param exchange 交换机名称
 * @param type 交换机类型
 * * direct: 定向
 * * fanout: 扇形(广播)
 * * topic: 主题
 * * headers: 头交换机
 * @param options 交换机选项
 * * durable: 是否持久化
 * * autoDelete: 是否自动删除
 */
await channel.assertExchange(exchangeName, 'direct', {
  durable:false
})

// publish 发布
/**
 * @param exchange 交换机名称
 * @param routingKey 路由键
 * @param content 消息内容
 * @param options 消息选项
 * * persistent: 是否持久化
 * * expiration: 消息过期时间
 * * contentType: 消息类型
 * * contentEncoding: 消息编码
 * * headers: 消息头 (适用于headers交换机模式)
 * * deliveryMode: 消息投递模式
 * * priority: 消息优先级
 * * correlationId: 消息关联ID
 * * replyTo: 消息回复队列
 * * expiration: 消息过期时间
 * * messageId: 消息ID
 * * timestamp: 消息时间戳
 */
channel.publish(exchangeName, 'key', Buffer.from('Hello Exchange!mode direct'))

// 自动断开连接和退出进程
setTimeout(async () => {
  // 先断开频道再断开连接
  await channel.close()
  await connection.close()
  process.exit(0)
}, 500);

consumer 消费者

js
import amqplib from 'amqplib'

// 连接rabbitmq服务
const connection = await amqplib.connect('amqp://user:pass@<ip>:<port>')
// 创建频道
const channel = await connection.createChannel()

console.log('waiting for messages...')

const exchangeName = 'my_test_exchange'
const queueName = 'my_test_queue_1'

// 声明交换机 避免交换机不存在
await channel.assertExchange(exchangeName, 'direct', {
  durable: false
})

// 声明队列
// exclusive选项:当exclusive为true时,队列仅对声明它的连接可见,并且在连接断开时自动删除
await channel.assertQueue(queueName, {
  exclusive: true
})

// 将队列接入交换机(交换机绑定队列)方法来源于channel
// 需要 队列名称 交换机名称 路由匹配的key{routingKey}
// routingKey的作用就是让交换机知道消息应该发送到哪些个队列,一个key可以绑定多个队列
await channel.bindQueue(queueName, exchangeName, 'key')

// 消费消息
channel.consume(queueName, (msg) => {
  console.log('[x] consume received:', msg?.content.toString())
}, {
  noAck: true
})