Sử dụng binding Exchange to Exchange trong RabbitMQ

Công Nghệ
Sử dụng binding Exchange to Exchange trong RabbitMQ
Bài viết được sự cho phép của tác giả Giang Phan Trong các bài viết trước chúng ta đã cùng tìm hiểu về các loại Exchange trong RabbitMQ. Có một câu hỏi đặt ra là có thể thực hiện binding một Exchange đến Exchange khác hay không? Câu trả lời...

Bài viết được sự cho phép của tác giả Giang Phan

Trong các bài viết trước chúng ta đã cùng tìm hiểu về các loại Exchange trong RabbitMQ. Có một câu hỏi đặt ra là có thể thực hiện binding một Exchange đến Exchange khác hay không? Câu trả lời là có và chúng ta sẽ thấy cách thực hiện như thế nào trong phần tiếp theo của bài viết này.

Flow của một Message trong Exchange-to-Exchange bindings

Để dễ hiểu, tôi sẽ mô tả Flow của một Message được chuyển từ Topic Exchange sang Header Exchange.

Sơ đồ bên dưới là sự kết hợp của sơ đồ ở bài viết Topic Exchange và Header Exchange. Điểm khác biệt duy nhất là cách mà Header Exchange binding tới Topic Exchange với routing key pattern “#.gpcoder.com“.

Sử dụng binding Exchange to Exchange trong RabbitMQSử dụng binding Exchange to Exchange trong RabbitMQ

  • Một Producer publish một Message đến source Exchange với một routing key dựa trên loại của Exchange. Trong trường hợp này là GPCoderTopicExchange.
  • Có 2 Queue: QJava và QAll được binding tới Exchange GPCoderTopicExchange lần lượt với routing key patter:
    • QJava : Queue này sẽ nhận tất cả message có Key match với routing key “java.*.gpcoder.com“. Nghĩa là chỉ nhận các message cho một topic Java cụ thể từ gpcoder.com, chẳng hạn: java.core.gpcoder.com, java.collection.gpcoder.com.
    • QAll : Queue này nhận tất cả message có Key match với routing key “#.gpcoder.com“. Nghĩa là nhận tất cả message từ gpcoder.com, chẳng hạn: design-pattern.gpcoder, java.gpcoder.com, creational.design-pattern.gpcoder.com.
  • Một Exchange khác là GPCoderHeadersExchange binding tới Exchange GPCoderTopicExchange với routing key pattern #.gpcoder.com.
  • Có 3 Queue: QDeveloper, QManager và QPublished được binding tới Exchange GPCoderHeadersExchange với các header:
    • QDeveloper : Queue này sẽ nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”}.
    • QManager : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”} hoặc {“manager”, “Manager Channel”}.
    • QPublished : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} và {“access”, “publish”}.
  • Bây giờ nếu một Message được gởi tới Exchange với routing key là design-pattern.gpcoder.com và header là {“dev”, “Developer Channel”} sẽ được chuyển tới Queue QAll và GPCoderHeadersExchange. Sau đó GPCoderHeadersExchange sẽ chuyển đến Queue QDeveloper và QManager dựa vào header {“dev”, “Developer Channel”}.
  • Tương tự vậy, chúng ta cũng có thể định nghĩa một hay nhiều Exchange khác, chẳng hạn GPCoderDirectExchange binding tới GPCoderHeadersExchange và GPCoderTopicExchange.

Một Exchange nhận Message từ publisher được gọi là Source Exchange. Trong sơ đồ trên, GPCoderTopicExchange là Source Exchange.

Một Exchange nhận Message từ một Exchange khác gọi là Destination Exchange. Trong sơ đồ trên, GPCoderHeadersExchange là Destination Exchange.

Ví dụ binding Exchange to Exchange trong RabbitMQ

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • ExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, binding Exchange đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • HeadersExchangeProducer : để gửi Message đến GPCoderHeadersExchange.
  • TopicExchangeProducer : để gửi Message đến GPCoderTopicExchange.
  • HeadersExchangeConsumer : để nhận Message từ Queue được binding đến GPCoderHeadersExchange.
  • TopicExchangeConsumer : để nhận Message từ Queue được binding đến GPCoderTopicExchange.
  • App: giả lập việc gửi nhận Message thông qua Topic Exchange của RabbitMQ.

ConnectionManager.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

private ConnectionManager() {
super();
}

public static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
return factory.newConnection();
}
}

ExchangeChannel.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ExchangeChannel {

private String exchangeName;
private Channel channel;
private Connection connection;

public ExchangeChannel(Connection connection, String exchangeName) throws IOException {
this.exchangeName = exchangeName;
this.connection = connection;
this.channel = connection.createChannel();
}

public void declareExchange(BuiltinExchangeType exchangeType) throws IOException {
// exchangeDeclare( exchange, builtinExchangeType, durable)
channel.exchangeDeclare(exchangeName, exchangeType, true);
}

public void declareQueues(String ...queueNames) throws IOException {
for (String queueName : queueNames) {
// queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare(queueName, true, false, false, null);
}
}

public void performQueueBinding(String queueName, String routingKey, Map<String, Object> headers) throws IOException {
// Create bindings - (queue, exchange, routingKey, headers)
channel.queueBind(queueName, exchangeName, routingKey, headers);
}

public void performExchangeBinding(String destination, String source, String routingKey) throws IOException {
// (destination-exchange, source-exchange , routingKey
channel.exchangeBind(destination, source, routingKey);
}

public void subscribeMessage(String queueName) throws IOException {
// basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
channel.basicConsume(queueName, true, ((consumerTag, message) -> {
System.out.println("[Received] [" + queueName + "]: " + consumerTag);
System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}

public void publishMessage(String message, String routingKey, Map<String, Object> headers) throws IOException {
BasicProperties properties = new BasicProperties()
.builder().headers(headers).build();

// basicPublish - ( exchange, routingKey, basicProperties, body)
System.out.println("[Send] [" + headers + "]: " + message);
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
}
}

Constant.java

package com.gpcoder.exchange2exchange;

public final class Constant {

// Exchange

public static final String HEADERS_EXCHANGE_NAME = "GPCoderHeadersExchange";

public static final String TOPIC_EXCHANGE_NAME = "GPCoderTopicExchange";

// Queue

public static final String DEV_QUEUE_NAME = "QDeveloper";

public static final String MANAGER_QUEUE_NAME = "QManager";

public static final String PUBLISHED_QUEUE_NAME = "QPublished";

public static final String JAVA_QUEUE_NAME = "QJava";

public static final String ALL_QUEUE_NAME = "QAll";

// Routing key pattern

public static final String JAVA_ROUTING_KEY = "java.*.gpcoder.com";

public static final String GPCODER_ROUTING_KEY = "#.gpcoder.com";

// Message key

public static final String JAVA_CORE_MSG_KEY = "java.core.gpcoder.com";

public static final String JAVA_MSG_KEY = "java.gpcoder.com";

public static final String DESIGN_PATTERN_MSG_KEY = "design-pattern.gpcoder.com";

public static final String NOT_MATCHING_MSG_KEY = "java.collection.gpcoder.com.vn";

private Constant() {
super();
}
}

HeadersExchangeProducer.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.exchange2exchange.Constant.*;

public class HeadersExchangeProducer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, HEADERS_EXCHANGE_NAME);

// Create headers exchange
channel.declareExchange(BuiltinExchangeType.HEADERS);

// Create queues
channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

// Binding queues with headers
Map<String, Object> devGroup = new HashMap<>();
devGroup.put("x-match", "any"); // Match any of the header
devGroup.put("dev", "Developer Channel");
devGroup.put("general", "General Channel");

Map<String, Object> managerGroup = new HashMap<>();
managerGroup.put("x-match", "any"); // Match any of the header
managerGroup.put("dev", "Developer Channel");
managerGroup.put("manager", "Manager Channel");
managerGroup.put("general", "General Channel");

Map<String, Object> publishedGroup = new HashMap<>();
publishedGroup.put("x-match", "all"); // Match all of the header
publishedGroup.put("general", "General Channel");
publishedGroup.put("access", "publish");

channel.performQueueBinding(DEV_QUEUE_NAME, "", devGroup);
channel.performQueueBinding(MANAGER_QUEUE_NAME, "", managerGroup);
channel.performQueueBinding(PUBLISHED_QUEUE_NAME, "", publishedGroup);
}
}

TopicExchangeProducer.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.exchange2exchange.Constant.*;

public class TopicExchangeProducer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

// Create topic exchange
channel.declareExchange(BuiltinExchangeType.TOPIC);

// Create queues
channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

// Binding queues without headers
channel.performQueueBinding(DEV_QUEUE_NAME, JAVA_ROUTING_KEY, null);
channel.performQueueBinding(MANAGER_QUEUE_NAME, GPCODER_ROUTING_KEY, null);

// Binding headers exchange to topic exchange
channel.performExchangeBinding(HEADERS_EXCHANGE_NAME, TOPIC_EXCHANGE_NAME, GPCODER_ROUTING_KEY);
}

public void send(String message, String messageKey, Map<String, Object> headers) throws IOException {
// Send message
channel.publishMessage(message, messageKey, headers);
}
}

HeadersExchangeConsumer.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.exchange2exchange.Constant.*;

public class HeadersExchangeConsumer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, HEADERS_EXCHANGE_NAME);

// Create headers exchange
channel.declareExchange(BuiltinExchangeType.HEADERS);

// Create queues
channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

// Binding queues with headers
Map<String, Object> devGroup = new HashMap<>();
devGroup.put("x-match", "any"); // Match any of the header
devGroup.put("dev", "Developer Channel");
devGroup.put("general", "General Channel");

Map<String, Object> managerGroup = new HashMap<>();
managerGroup.put("x-match", "any"); // Match any of the header
managerGroup.put("dev", "Developer Channel");
managerGroup.put("manager", "Manager Channel");
managerGroup.put("general", "General Channel");

Map<String, Object> publishedGroup = new HashMap<>();
publishedGroup.put("x-match", "all"); // Match all of the header
publishedGroup.put("general", "General Channel");
publishedGroup.put("access", "publish");

channel.performQueueBinding(DEV_QUEUE_NAME, "", devGroup);
channel.performQueueBinding(MANAGER_QUEUE_NAME, "", managerGroup);
channel.performQueueBinding(PUBLISHED_QUEUE_NAME, "", publishedGroup);
}

public void subscribe() throws IOException {
// Subscribe message
channel.subscribeMessage(DEV_QUEUE_NAME);
channel.subscribeMessage(MANAGER_QUEUE_NAME);
channel.subscribeMessage(PUBLISHED_QUEUE_NAME);
}
}

TopicExchangeConsumer.java

package com.gpcoder.exchange2exchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.exchange2exchange.Constant.*;
import static com.gpcoder.exchange2exchange.Constant.PUBLISHED_QUEUE_NAME;

public class TopicExchangeConsumer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

// Create topic exchange
channel.declareExchange(BuiltinExchangeType.TOPIC);

// Create queues
channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

// Binding queues without headers
channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY, null);
channel.performQueueBinding(ALL_QUEUE_NAME, GPCODER_ROUTING_KEY, null);

// Binding headers exchange to topic exchange
channel.performExchangeBinding(HEADERS_EXCHANGE_NAME, TOPIC_EXCHANGE_NAME, GPCODER_ROUTING_KEY);
}

public void subscribe() throws IOException {
// Subscribe message
channel.subscribeMessage(JAVA_QUEUE_NAME);
channel.subscribeMessage(ALL_QUEUE_NAME);
}
}

App.java

package com.gpcoder.exchange2exchange;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.exchange2exchange.Constant.DESIGN_PATTERN_MSG_KEY;

public class App {

public static void main(String[] args) throws IOException, TimeoutException {
HeadersExchangeProducer producer1 = new HeadersExchangeProducer();
producer1.start();

TopicExchangeProducer producer2 = new TopicExchangeProducer();
producer2.start();

// Publish some messages
Map<String, Object> devHeader = new HashMap<>();
devHeader.put("dev", "Developer Channel");
producer2.send("[1] Head First Design Pattern", DESIGN_PATTERN_MSG_KEY, devHeader);

HeadersExchangeConsumer consumer1 = new HeadersExchangeConsumer();
consumer1.start();
consumer1.subscribe();

TopicExchangeConsumer consumer2 = new TopicExchangeConsumer();
consumer2.start();
consumer2.subscribe();
}
}

Output của chương trình:

[Send] [{dev=Developer Channel}]: [1] Head First Design Pattern
[Received] [QDeveloper]: amq.ctag-QVREG0uBg6XsIebvbecqCg
[Received] [QDeveloper]: [1] Head First Design Pattern
[Received] [QManager]: amq.ctag-vLHleyPNedl2ZiMYukIing
[Received] [QManager]: [1] Head First Design Pattern
[Received] [QAll]: amq.ctag-GymeB0uwpALjgoM3RP_cvg
[Received] [QAll]: [1] Head First Design Pattern

Như bạn thấy, Message được gởi tới Exchange với routing key là design-pattern.gpcoder.com và header là {“dev”, “Developer Channel”} sẽ được chuyển tới Queue QAll và GPCoderHeadersExchange. Sau đó GPCoderHeadersExchange sẽ chuyển đến Queue QDeveloper và QManager dựa vào header {“dev”, “Developer Channel”}.

Tóm lại, Exchange Binding trong RabbitMQ là một tính năng vô cùng mạnh mẽ, giúp chúng ta kết hợp nhiều loại Exchange lại với nhau, giúp hệ thống mở rộng tốt hơn và đáp ứng được hầu hết yêu cầu phức tạp của hệ thống.

Bài viết gốc được đăng tải tại gpcoder.com

Có thể bạn quan tâm:

Xem thêm Việc làm Developer hấp dẫn trên Station D

Bài viết liên quan

Ngành IT: Làm việc “trên mây” kiếm nhiều tiền nhất hiện nay

Ngành IT: Làm việc “trên mây” kiếm nhiều tiền nhất hiện nay

Kết quả từ cuộc khảo sát đầu năm của Station D về lương bổng của lập trình viên cho thấy nhiều thay đổi đã và đang diễn ra trong ngành IT – cuộc khảo sát tập trung vào các câu hỏi về khối lượng công việc, triển vọng cũng như...

By stationd
Đâu chỉ mỗi Bitcoin, công nghệ Blockchain còn nhiều ứng dụng hơn thế!

Đâu chỉ mỗi Bitcoin, công nghệ Blockchain còn nhiều ứng dụng hơn thế!

Khi nhắc đến blockchain , lập tức mọi người thường nghĩ ngay đến các loại tiền mã hóa, chẳng hạn như bitcoin. Tuy nhiên, blockchain lại là công nghệ tạo ra tiền mã hóa nhưng bản thân công nghệ này không phải là tiền mã hóa như cách mà chúng...

By stationd
Mock phương thức static trong Unit Test sử dụng PowerMock

Mock phương thức static trong Unit Test sử dụng PowerMock

Bài viết được sự cho phép của tác giả Nguyễn Hữu Khanh Trong bài viết này, mình sẽ hướng dẫn các bạn Mock các phương thức static trong Unit Test các bạn nhé! Nếu bạn nào chưa biết về Mock trong Unit Test thì mình có thể nói sơ qua...

By stationd
Một "thuật ngữ ma" đã tồn tại 75 năm trên internet, nó đang "ám" vào các mô hình AI, và sẽ còn tiếp tục tồn tại cho đến vĩnh cửu

Một "thuật ngữ ma" đã tồn tại 75 năm trên internet, nó đang "ám" vào các mô hình AI, và sẽ còn tiếp tục tồn tại cho đến vĩnh cửu

Một lời cảnh báo cho những người thích trích dẫn kiểu "nguồn sưu tầm", "nguồn internet" hay "nguồn AI", họ có thể sẽ đào lên được những "hóa thạch số" vô nghĩa.

By admin
Cảnh Báo Malware Giả Mạo Hợp Đồng Việc Làm: Tập Tin .EXE Nguy Hiểm Đội Lốt PDF/Word

Cảnh Báo Malware Giả Mạo Hợp Đồng Việc Làm: Tập Tin .EXE Nguy Hiểm Đội Lốt PDF/Word

Kẻ xấu đang lợi dụng nhu cầu tìm việc để phát tán phần mềm độc hại (malware) dưới dạng tệp 'hợp đồng' giả mạo. Hãy cảnh giác với những file có icon Word/PDF nhưng thực chất là .exe. Nếu mở, máy tính của bạn có thể bị đánh cắp toàn bộ thông tin cá nhân, cookie và mật khẩu.

By admin