自定义传输
Nest provides TCP and Redis as a built-in transport methods. It makes prototyping incredibly fast & easy, but sometimes you might want to use another type of transport, e.g. RabbitMQ messaging. Is it possible? Yes, sure.
Nest提供TCP和Redis作为内置传输方法。这些方法使得原型化特别容易,特别迅速。但是,有时候你可能想使用其他类型的传输类型,比如RabbitMQ messaging。
You can port any transport strategy to Nest. You only have to create a class, which extends Server and implements CustomTransportStrategy interface.
你只需要创建一个类,就可以将任何传输方式应用到Nest中。因为这个类可以扩展服务器并实现CustomTransportStrategy接口。
The Server class provides getHandlers() method, which returns MessagePattern mappings (object, where key is a pattern and value is a callback), while CustomTransportStrategy forces on you to implement both listen() and close() methods.
该服务器类提供getHandlers()方法,该方法返回MessagePattern映射(对象,该对象中,key是模式,值是回调函数),CustomTransportStrategy迫使你实现listen()andclose()方法。
Let's create a simple RabbitMQServer class. We will use ampqlib library.
让我们使用amprlib库创建一个简单的RabbitMQServer类。
import * as amqp from 'amqplib';
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Observable } from 'rxjs/Observable';
export class RabbitMQServer extends Server implements CustomTransportStrategy {
private server = null;
private channel = null;
constructor(
private readonly host: string,
private readonly queue: string) {
super();
}
public async listen(callback: () => void) {
await this.init();
this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), { noAck: true });
}
public close() {
this.channel && this.channel.close();
this.server && this.server.close();
}
private handleMessage(message) {
const { content } = message;
const msg = JSON.parse(content.toString());
const handlers = this.getHandlers();
const pattern = JSON.stringify(msg.pattern);
if (!this.messageHandlers[pattern]) {
return;
}
const handler = this.messageHandlers[pattern];
const response$ = handler(msg.data) as Observable<any>;
response$ && this.send(response$, (data) => this.sendMessage(data));
}
private sendMessage(message) {
this.channel.sendToQueue(`${this.queue}_pub`, Buffer.from(JSON.stringify(message)));
}
private async init() {
this.server = await amqp.connect(this.host);
this.channel = await this.server.createChannel();
this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
}
}
The most interesting method is handleMessage(). Its resposibility is to match pattern with appropriate handler and call it with received data. Also, notice that I used send() method inherited from Server class. You should use it too if you want to avoid e.g. sending disposed message when Observable is completed.
最有趣的方法是handleMessage()。该方法负责用合适的handler匹配模式,并且用接收的数据调用该模式。请注意,我使用的是从服务器类继承的send()方法。你也应该使用该方法避免Observable完成后发送设置信息。
Last step is to set-up our RabbitMQ strategy:
最后一个步骤是设置我们的RabbitMQ方法:
const app = NestFactory.createMicroservice(ApplicationModule, {
strategy: new RabbitMQServer('amqp://localhost', 'example'),
});
It's everything!
客户端
The RabbitMQ server is listening for messages. Now, we must create a client class, which should extends built-in ClientProxy. We only have to override abstract sendSingleMessage() method.
RabbitMQ服务器监听消息。现在,我们必须创建一个可以扩展内置ClientProxy的客户端类。
Let's create RabbitMQClient class:
让我们来创建一个RabbitMQClient类。
import * as amqp from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';
export class RabbitMQClient extends ClientProxy {
constructor(
private readonly host: string,
private readonly queue: string) {
super();
}
protected async sendSingleMessage(msg, callback: (err, result, disposed?: boolean) => void) {
const server = await amqp.connect(this.host);
const channel = await server.createChannel();
const sub = this.getSubscriberQueue();
const pub = this.getPublisherQueue();
channel.assertQueue(sub, { durable: false });
channel.assertQueue(pub, { durable: false });
channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
channel.sendToQueue(sub, Buffer.from(JSON.stringify(msg)));
}
private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
const { content } = message;
const { err, response, disposed } = JSON.parse(content.toString());
if (disposed) {
server.close();
}
callback(err, response, disposed);
}
private getPublisherQueue(): string {
return `${this.queue}_pub`;
}
private getSubscriberQueue(): string {
return `${this.queue}_sub`;
}
}
How to use it? There is nothing special, just create an instance:
该怎么使用它呢?只需要创建一个实例即可。
export class ClientController {
private readonly client = new RabbitMQClient('amqp://localhost', 'example');
}
The rest work equivalently (use send() method).
剩余的跟(use send() 方法)相同。