上一篇文章中,簡(jiǎn)單的介紹了一下RabbitMQ的work模型。這篇文章來(lái)學(xué)習(xí)一下RabbitMQ中的發(fā)布訂閱模型。
發(fā)布訂閱模型(Publish/Subscribe):簡(jiǎn)單的說(shuō)就是隊(duì)列里面的消息會(huì)被多個(gè)消費(fèi)者同時(shí)接受到,消費(fèi)者接收到的信息一致。
發(fā)布訂閱模型適合于做模塊之間的異步通信。

 img
適用場(chǎng)景
- 發(fā)送并記錄日志信息
 - springcloud的config組件里面通知配置自動(dòng)更新
 - 緩存同步
 - 微信訂閱號(hào)
 
演示
生產(chǎn)者
public class Producer {
    private static final String EXCHANGE_NAME = "exchange_publish_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 發(fā)送消息到交換機(jī)
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("發(fā)布訂閱模型的第 " + i + " 條消息").getBytes());
        }
        // 關(guān)閉資源
        channel.close();
        connection.close();
    }
}
消費(fèi)者
// 消費(fèi)者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_publish_1";
    private static final String EXCHANGE_NAME = "exchange_publish_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 將隊(duì)列綁定到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊(duì)列1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
// 消費(fèi)者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_publish_2";
    private static final String EXCHANGE_NAME = "exchange_publish_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 將隊(duì)列綁定到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊(duì)列2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
測(cè)試
先啟動(dòng)2個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者


可以看出來(lái)消費(fèi)者1和消費(fèi)者2接收到的消息是一模一樣的 ,每個(gè)消費(fèi)者都收到了生產(chǎn)者發(fā)送的消息;
發(fā)布訂閱模型,用到了一個(gè)新的東西-交換機(jī),這里也解釋一下相關(guān)方法的參數(shù):
// 聲明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 該方法的最多參數(shù)的重載方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
                                    BuiltinExchangeType type,
                                    boolean durable,
                                    boolean autoDelete,
                                    boolean internal,
                                    Map< String, Object > arguments) throws IOException;
/**
 *  param1:exchange,交換機(jī)名稱
 *  param2:type,交換機(jī)類型;直接寫 string效果一致;內(nèi)置了4種交換機(jī)類型:
 *   direct(路由模式)、fanout(發(fā)布訂閱模式)、
 *   topic(topic模式-模糊匹配)、headers(標(biāo)頭交換,由Headers的參數(shù)分配,不常用)
 *  param3:durable,是否持久化交換機(jī)   false:默認(rèn)值,不持久化
 *  param4:autoDelete,沒有消費(fèi)者使用時(shí),是否自動(dòng)刪除交換機(jī)   false:默認(rèn)值,不刪除
 *  param5:internal,是否內(nèi)置,如果設(shè)置 為true,則表示是內(nèi)置的交換器, 客戶端程序無(wú)法直接發(fā)送消息到這個(gè)交換器中, 只能通過(guò)交換器路由到交換器的方式  false:默認(rèn)值,允許外部直接訪問(wèn)
 *  param6:arguments,交換機(jī)的一些其他屬性,默認(rèn)值為 null
 */
// 將隊(duì)列綁定到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
 *  param1:destination,目的地,隊(duì)列的名字
 *  param2:source,資源,交換機(jī)的名字
 *  param3:routingKey,路由鍵(目前沒有用到routingKey,填 "" 即可)
 */
小結(jié)
本文到這里就結(jié)束了,介紹了RabbitMQ通信模型中的發(fā)布訂閱模型,適合于做模塊之間的異步通信。
- 
                                交換機(jī)
                                +關(guān)注
關(guān)注
23文章
2856瀏覽量
103618 - 
                                緩存
                                +關(guān)注
關(guān)注
1文章
248瀏覽量
27618 - 
                                模型
                                +關(guān)注
關(guān)注
1文章
3622瀏覽量
51591 - 
                                springcloud
                                +關(guān)注
關(guān)注
0文章
17瀏覽量
1653 - 
                                rabbitmq
                                +關(guān)注
關(guān)注
0文章
20瀏覽量
1229 
發(fā)布評(píng)論請(qǐng)先 登錄
RabbitMQ通信模型中的work模型
    
RabbitMQ是什么
    
          
        
        
RabbitMQ中的發(fā)布訂閱模型
                
 
    
           
            
            
                
            
評(píng)論