消息队列

本文整理了一些消息队列中间件的安装、配置和使用,以及心得体会

1. 概念#

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由 MQ 服务端给其推送消息。

2. 应用场景#

  • 多进程通信
  • 异步消息
  • 分布式应用

3. MQ 中间件介绍#

3.1 MSMQ#

3.1.1 安装#

参考安装“消息队列 (MSMQ)” - WCF

3.1.2 使用感受#

  • 需要在程序和功能界面单独添加该功能
  • 工作组计算机只能创建专用队列,没添加到域无法创建公共队列
  • 不支持多消费者,需要自己编码实现多消费者逻辑
  • 适用于点对点通信
  • 消息序列化为XML进行保存和传输

3.1.3 代码片段#

private const string MessageQueuePath = @".\Private$\AppQueue";// "Label:TheLabel";// @".\PublicAppQueue";// @".\Private$\AppQueue";
private static string userName;
private static List<string> messages = new List<string>();

public MainWindow()
{
    InitializeComponent();

    user.Text = $"用户 {Guid.NewGuid().ToString("N").Substring(0, 5)}";
    userName = user.Text;

    MessageQueue messageQueue = GetMessageQueue();
    messageQueue.PeekCompleted += MessageQueue_PeekCompleted;
    messageQueue.BeginPeek();
}

private void MessageQueue_PeekCompleted(object sender, PeekCompletedEventArgs e)
{
    if (sender is MessageQueue messageQueue)
    {
        var message = messageQueue.EndPeek(e.AsyncResult);

        if (!messages.Contains(message.Id))
        {
            messages.Add(message.Id);

            var currentUser = message.Label == userName;
            if (!currentUser)
            {
                message = messageQueue.Receive();
            }

            var msg = (string)message.Body;
            var flowDirection = currentUser ? FlowDirection.RightToLeft : FlowDirection.LeftToRight;
            var textAlignment = currentUser ? HorizontalAlignment.Right : HorizontalAlignment.Left;
            var content = $"{message.Label}: {msg}";

            Dispatcher.Invoke(() =>
                              {
                recv.Items.Add(new ListBoxItem
                               {
                    Content = content,
                    HorizontalContentAlignment = textAlignment,
                    FlowDirection = flowDirection,
                });
            });
        }

        messageQueue.BeginPeek();
    }
}

private void Button_Click(object sender, RoutedEventArgs e)
{
    MessageQueue messageQueue = GetMessageQueue();
    messageQueue.Send(new Message { Body = send.Text }, userName);

    send.Text = string.Empty;
}

private static MessageQueue GetMessageQueue()
{
    MessageQueue messageQueue;
    if (!MessageQueue.Exists(MessageQueuePath))
    {
        messageQueue = MessageQueue.Create(MessageQueuePath);
    }
    else
    {
        messageQueue = new MessageQueue(MessageQueuePath);
    }
    messageQueue.Label = userName;
    messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

    return messageQueue;
}

3.2 RabbitMQ#

3.2.1 安装#

参考Downloading and Installing RabbitMQ — RabbitMQ 安装 Erlang 和 RabbitMQ 之后

  1. 找到 .erlang.cookie 文件,保证文件内容一致

  2. 通过命令 .\rabbitmq-plugins.bat enable rabbitmq_management 启动管理 UI,访问 http://localhost:15672/

  3. 创建 RabbitMQ 用户

    .\rabbitmqctl.bat add_user username password
  4. 设置管理员权限

    .\rabbitmqctl.bat set_user_tags admin administrator
  5. 添加 NuGet 包:RabbitMQ.Client

3.2.2 使用感受#

  • 最新版本不支持 .NET Framework 4.5
  • connection 和 channel 在程序退出时必须释放,否则应用进程不退出
  • 可以通过事件接受新消息,消息默认按顺序发给多个消费者

3.2.3 代码片段#

ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
IConnection connection;
IModel channel;

public MainWindow()
{
    InitializeComponent();

    connection = factory.CreateConnection();
    channel = connection.CreateModel();

    channel.QueueDeclare("hello", 
        durable: false, // 消息持久性 设为true可以保证任务队列中的消息在RabbitMQ重启时不会丢失
        false, false, null);
    channel.BasicQos(0, 1, false);

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += Consumer_Received;

    channel.BasicConsume("hello", true, consumer);
}

// 释放RabbitMQ资源,否则进程不会退出
protected override void OnClosed(EventArgs e)
{
    channel.Dispose();
    connection.Dispose();

    base.OnClosed(e);
}

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body.ToArray();
    var msg = Encoding.UTF8.GetString(body);
    Dispatcher.Invoke(() =>
    {
        recv.Text += msg + Environment.NewLine;
    });
}

private void Button_Click(object sender, RoutedEventArgs e)
{
    var msg = send.Text;

    if (string.IsNullOrEmpty(msg))
    {
        return;
    }

    send.Text = string.Empty;

    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("hello", false, false, false, null);

        var body = Encoding.UTF8.GetBytes(msg);

        // 将消息标记为持久,此方法并不能完全保证 参考;https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish("", "hello", null, body);
    }
}

3.3 Kafka#

3.3.1 安装#

  1. 下载最新稳定版本二进制文件
  2. 解压并在命令行打开
  3. 启动一个 ZooKeeper 服务器
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  1. 启动 Kafka 服务器
.\bin\windows\kafka-server-start.bat .\config\server.properties
  1. 创建一个 Topic
.\bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092

端口号为 .\config\server.properties 中配置的服务器的端口号

  1. 查看 Topic
.\bin\windows\kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092
  1. 发送消息
.\bin\windows\kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092
  1. 读取消息
.\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

3.3.2 使用感受#

3.3.2.1 KNet

  • 不支持 .NET Framework 4.5
  • 由于 KNet 使用了 JNet 包括 JCOBridge,而 JCOBridge 需要填写申请表申请授权,故没有体验

3.3.2.2 Confluent.Kafka(微软推荐)

  • 支持 .NET Framework 4.5
  • 引用 NuGet 包 Confluent.Kafka.StrongName
  • 支持多消费者模式
  • Windows 会在 Kafka 所在磁盘根目录下创建 tmp 文件夹,存放 log 和 topic
  • consumer 不支持事件通知,需要在线程里查询是否有新消息

3.3.3 代码片段#

const string Topic = "test-topic";

IProducer<Null, string> producer;
IConsumer<Ignore, string> consumer;

public MainWindow()
{
    InitializeComponent();

    var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

    producer = new ProducerBuilder<Null, string>(config).Build();

    var conf = new ConsumerConfig
    {
        //相同GroupId的消费者,不会同时消费同一个分区的Topic
        GroupId = "test-consumer-group" + DateTime.Now.Ticks,
        BootstrapServers = "localhost:9092",

        // Note: The AutoOffsetReset property determines the start offset in the event
        // there are not yet any committed offsets for the consumer group for the
        // topic/partitions of interest. By default, offsets are committed
        // automatically, so in this example, consumption will only start from the
        // earliest message in the topic 'my-topic' the first time you run the program.
        AutoOffsetReset = AutoOffsetReset.Earliest,
    };

    consumer = new ConsumerBuilder<Ignore, string>(conf).Build();
    consumer.Subscribe(Topic);

    Task.Run(() =>
    {
        try
        {
            while (true)
            {
                try
                {
                    var cr = consumer.Consume();
                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    Dispatcher.Invoke(() =>
                    {
                        recv.Text += cr.Value + Environment.NewLine;
                    });
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occured: {e.Error.Reason}");
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error occured: {ex.Message}");
        }
    });
}

private async void Button_Click(object sender, RoutedEventArgs e)
{
    var msg = send.Text;
    if (string.IsNullOrEmpty(msg))
    {
        return;
    }

    try
    {
        var dr = await producer.ProduceAsync(Topic, new Message<Null, string> { Value = msg });
        Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
    }
    catch (ProduceException<Null, string> ex)
    {
        MessageBox.Show($"Delivery failed: {ex.Error.Reason}");
        throw;
    }
}

❤️ 如果这篇文章对你有帮助,欢迎赞助支持我继续维护 ❤️

☕ Support me ⚡ 爱发电赞助