本文整理了一些消息队列中间件的安装、配置和使用,以及心得体会
1. 概念#
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由 MQ 服务端给其推送消息。
2. 应用场景#
- 多进程通信
- 异步消息
- 分布式应用
3. MQ 中间件介绍#
3.1 MSMQ#
3.1.1 安装#
3.1.2 使用感受#
- 需要在程序和功能界面单独添加该功能
- 工作组计算机只能创建专用队列,没添加到域无法创建公共队列
- 不支持多消费者,需要自己编码实现多消费者逻辑
- 适用于点对点通信
- 消息序列化为XML进行保存和传输
3.1.3 代码片段#
private const string MessageQueuePath = @".\Private$\AppQueue";// "Label:TheLabel";// @".\PublicAppQueue";// @".\Private$\AppQueue";
private static string userName;
private static List<string> messages = new List<string>();
public MainWindow()
{
InitializeComponent();
user.Text = $"用户 {Guid.NewGuid().ToString("N").Substring(0, 5)}";
userName = user.Text;
MessageQueue messageQueue = GetMessageQueue();
messageQueue.PeekCompleted += MessageQueue_PeekCompleted;
messageQueue.BeginPeek();
}
private void MessageQueue_PeekCompleted(object sender, PeekCompletedEventArgs e)
{
if (sender is MessageQueue messageQueue)
{
var message = messageQueue.EndPeek(e.AsyncResult);
if (!messages.Contains(message.Id))
{
messages.Add(message.Id);
var currentUser = message.Label == userName;
if (!currentUser)
{
message = messageQueue.Receive();
}
var msg = (string)message.Body;
var flowDirection = currentUser ? FlowDirection.RightToLeft : FlowDirection.LeftToRight;
var textAlignment = currentUser ? HorizontalAlignment.Right : HorizontalAlignment.Left;
var content = $"{message.Label}: {msg}";
Dispatcher.Invoke(() =>
{
recv.Items.Add(new ListBoxItem
{
Content = content,
HorizontalContentAlignment = textAlignment,
FlowDirection = flowDirection,
});
});
}
messageQueue.BeginPeek();
}
}
private void Button_Click(object sender, RoutedEventArgs e)
{
MessageQueue messageQueue = GetMessageQueue();
messageQueue.Send(new Message { Body = send.Text }, userName);
send.Text = string.Empty;
}
private static MessageQueue GetMessageQueue()
{
MessageQueue messageQueue;
if (!MessageQueue.Exists(MessageQueuePath))
{
messageQueue = MessageQueue.Create(MessageQueuePath);
}
else
{
messageQueue = new MessageQueue(MessageQueuePath);
}
messageQueue.Label = userName;
messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
return messageQueue;
}3.2 RabbitMQ#
3.2.1 安装#
参考Downloading and Installing RabbitMQ — RabbitMQ 安装 Erlang 和 RabbitMQ 之后
-
找到
.erlang.cookie文件,保证文件内容一致 -
通过命令
.\rabbitmq-plugins.bat enable rabbitmq_management启动管理 UI,访问 http://localhost:15672/ -
创建 RabbitMQ 用户
.\rabbitmqctl.bat add_user username password -
设置管理员权限
.\rabbitmqctl.bat set_user_tags admin administrator -
添加 NuGet 包:
RabbitMQ.Client
3.2.2 使用感受#
- 最新版本不支持 .NET Framework 4.5
- connection 和 channel 在程序退出时必须释放,否则应用进程不退出
- 可以通过事件接受新消息,消息默认按顺序发给多个消费者
3.2.3 代码片段#
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
IConnection connection;
IModel channel;
public MainWindow()
{
InitializeComponent();
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare("hello",
durable: false, // 消息持久性 设为true可以保证任务队列中的消息在RabbitMQ重启时不会丢失
false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("hello", true, consumer);
}
// 释放RabbitMQ资源,否则进程不会退出
protected override void OnClosed(EventArgs e)
{
channel.Dispose();
connection.Dispose();
base.OnClosed(e);
}
private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body.ToArray();
var msg = Encoding.UTF8.GetString(body);
Dispatcher.Invoke(() =>
{
recv.Text += msg + Environment.NewLine;
});
}
private void Button_Click(object sender, RoutedEventArgs e)
{
var msg = send.Text;
if (string.IsNullOrEmpty(msg))
{
return;
}
send.Text = string.Empty;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
var body = Encoding.UTF8.GetBytes(msg);
// 将消息标记为持久,此方法并不能完全保证 参考;https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish("", "hello", null, body);
}
}3.3 Kafka#
3.3.1 安装#
- 下载最新稳定版本二进制文件
- 解压并在命令行打开
- 启动一个 ZooKeeper 服务器
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties- 启动 Kafka 服务器
.\bin\windows\kafka-server-start.bat .\config\server.properties- 创建一个 Topic
.\bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092端口号为 .\config\server.properties 中配置的服务器的端口号
- 查看 Topic
.\bin\windows\kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092- 发送消息
.\bin\windows\kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092- 读取消息
.\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:90923.3.2 使用感受#
3.3.2.1 KNet
- 不支持 .NET Framework 4.5
- 由于 KNet 使用了 JNet 包括 JCOBridge,而 JCOBridge 需要填写申请表申请授权,故没有体验
3.3.2.2 Confluent.Kafka(微软推荐)
- 支持 .NET Framework 4.5
- 引用 NuGet 包
Confluent.Kafka.StrongName - 支持多消费者模式
- Windows 会在 Kafka 所在磁盘根目录下创建
tmp文件夹,存放 log 和 topic - consumer 不支持事件通知,需要在线程里查询是否有新消息
3.3.3 代码片段#
const string Topic = "test-topic";
IProducer<Null, string> producer;
IConsumer<Ignore, string> consumer;
public MainWindow()
{
InitializeComponent();
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
producer = new ProducerBuilder<Null, string>(config).Build();
var conf = new ConsumerConfig
{
//相同GroupId的消费者,不会同时消费同一个分区的Topic
GroupId = "test-consumer-group" + DateTime.Now.Ticks,
BootstrapServers = "localhost:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest,
};
consumer = new ConsumerBuilder<Ignore, string>(conf).Build();
consumer.Subscribe(Topic);
Task.Run(() =>
{
try
{
while (true)
{
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
Dispatcher.Invoke(() =>
{
recv.Text += cr.Value + Environment.NewLine;
});
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error occured: {ex.Message}");
}
});
}
private async void Button_Click(object sender, RoutedEventArgs e)
{
var msg = send.Text;
if (string.IsNullOrEmpty(msg))
{
return;
}
try
{
var dr = await producer.ProduceAsync(Topic, new Message<Null, string> { Value = msg });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> ex)
{
MessageBox.Show($"Delivery failed: {ex.Error.Reason}");
throw;
}
}