本文介绍如何在使用多个数据库的模块化单体应用中, 通过 Outbox/Inbox
模式实现可靠的事件处理. 我们将以 ModularCRM
项目为例进行说明.
项目背景
ModularCRM
是一个集成了多个 ABP 框架开源模块的单体应用, 包括:
Account
Identity
Tenant Management
Permission Management
Setting Management
等开源模块
除了ABP框架开源模块外, 项目还包含三个业务模块:
订单模块( Products
), 使用MongoDB
数据库产品模块( Ordering
), 使用SQL Server
数据库支付模块( Payment
), 使用MongoDB
数据库
项目在 appsettings.json
中分别为 ModularCRM
和三个业务模块配置了独立的数据库连接字符串:
{
"ConnectionStrings": {
"Default": "Server=localhost,1434;Database=ModularCrm;User Id=sa;Password=1q2w3E***;TrustServerCertificate=true",
"Products": "Server=localhost,1434;Database=ModularCrm_Products;User Id=sa;Password=1q2w3E***;TrustServerCertificate=true",
"Ordering": "mongodb://localhost:27017/ModularCrm_Ordering?replicaSet=rs0",
"Payment": "mongodb://localhost:27017/ModularCrm_Payment?replicaSet=rs0"
}
}
业务场景
这些模块通过 ABP 框架的 DistributedEventBus
进行通信, 实现以下业务流程:
这里我们以一个简单的业务流程为例, 实际业务流程会更复杂. 示例代码主要用于演示和问题解决.
订单模块: 用户下单后发布 OrderPlacedEto
事件产品模块: 订阅 OrderPlacedEto
事件后更新产品库存支付模块: 订阅 OrderPlacedEto
事件后处理支付, 完成后发布PaymentCompletedEto
事件订单模块: 订阅 PaymentCompletedEto
事件后更新订单状态
实现这个流程时, 我们需要确保:
下单操作和事件发布的事务一致性 各模块处理消息时的事务一致性 消息传递的可靠性(包括持久化、确认和重试机制)
仅使用 ABP 框架的 DistributedEventBus
无法满足上述要求, 因此我们需要引入新的机制.
Outbox/Inbox 模式解决方案
为了满足上述要求,我们采用 Outbox/Inbox
模式:
Outbox 模式
将分布式事件与数据库操作在同一事务中保存 通过后台作业将事件发送到分布式消息中间件 确保数据更新与事件发布的一致性 防止系统故障期间的消息丢失
Inbox 模式
先将接收到的分布式事件保存到数据库 通过事务性方式处理事件 通过保存已处理消息来确保消息只被处理一次 维护处理状态以实现可靠处理
如何在项目和模块中启用和配置
Outbox/Inbox
, 请参考:
https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed#
outbox-inbox-for-transactional-events
模块配置
每个模块需要配置独立的 Outbox/Inbox
. 由于是单体应用, 所有消息处理类都在同一个项目中, 我们需要为每个模块配置 Outbox/Inbox
的 Selector/EventSelector
, 以确保模块只发送和接收它关注的消息, 避免消息重复处理.
ModularCRM 主应用配置
它会发送和接收所有ABP框架开源模块的消息.
// This selector will match all abp built-in modules and the current module.
Func<Type, bool> abpModuleSelector = type => type.Namespace != && (type.Namespace.StartsWith("Volo.") || type.Assembly == typeof(ModularCrmModule).Assembly);
Configure(options =>
{
options.Inboxes.Configure("ModularCrm", config =>
{
config.UseDbContext();
config.EventSelector = abpModuleSelector;
config.HandlerSelector = abpModuleSelector;
});
options.Outboxes.Configure("ModularCrm", config =>
{
config.UseDbContext();
config.Selector = abpModuleSelector;
});
});
订单模块配置
它只发送OrderPlacedEto
事件, 并接收PaymentCompletedEto
事件和执行
.
OrderPaymentCompletedEventHandler
Configure(options =>
{
options.Inboxes.Configure(OrderingDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.EventSelector = type => type == typeof(PaymentCompletedEto);
config.HandlerSelector = type => type == typeof(OrderPaymentCompletedEventHandler);
});
options.Outboxes.Configure(OrderingDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.Selector = type => type == typeof(OrderPlacedEto);
});
});
产品模块配置
它只接收EntityCreatedEto
和OrderPlacedEto
事件, 并执行
和
ProductsOrderPlacedEventHandler
. 暂时不需要发送任何事件.
ProductsUserCreatedEventHandler
Configure(options =>
{
options.Inboxes.Configure(ProductsDbProperties.ConnectionStringName, config =>
{
config.UseDbContext();
config.EventSelector = type => type == typeof(EntityCreatedEto) || type == typeof(OrderPlacedEto);
config.HandlerSelector = type => type == typeof(ProductsOrderPlacedEventHandler) || type == typeof(ProductsUserCreatedEventHandler);
});
// Outboxes are not used in this module
options.Outboxes.Configure(ProductsDbProperties.ConnectionStringName, config =>
{
config.UseDbContext();
config.Selector = type => false;
});
});
支付模块配置
它只发送PaymentCompletedEto
事件, 并接收OrderPlacedEto
事件和执行
.
PaymentOrderPlacedEventHandler
Configure(options =>
{
options.Inboxes.Configure(PaymentDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.EventSelector = type => type == typeof(OrderPlacedEto);
config.HandlerSelector = type => type == typeof(PaymentOrderPlacedEventHandler);
});
options.Outboxes.Configure(PaymentDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.Selector = type => type == typeof(PaymentCompletedEto);
});
});
运行ModularCRM模拟业务流程
在 ModularCrm
目录下运行:
# 在Docker中启动SQL Server和MongoDB数据库
docker-compose up -d
# 还原安装依赖项
abp install-lib
# 迁移数据库
dotnet run --project ModularCrm --migrate-database
# 启动应用
dotnet run --project ModularCrm
访问 https://localhost:44303/
进入应用首页
输入一个客户名称然后选择一个产品并提交一个订单. 稍等片刻后刷新页面可以看到订单,产品以及支付信息.
系统日志显示完整的处理流程:
[Ordering Module] Order created: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9, CustomerName: john
[Products Module] OrderPlacedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, CustomerName: john, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9
[Products Module] Stock count decreased for ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9
[Payment Module] OrderPlacedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, CustomerName: john, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9
[Payment Module] Payment processing completed for OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88
[Ordering Module] PaymentCompletedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, PaymentId: d0a41ead-ee0f-714c-e254-3a1834504d65, PaymentMethod: CreditCard, PaymentAmount: ModularCrm.Payment.Payment.PaymentCompletedEto
[Ordering Module] Order state updated to Delivered for OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88
此外,当新用户注册时,产品模块还会接收到 EntityCreatedEto
事件, 我们会给新用户发送一个邮件, 这只是为了演示Outbox/Inbox的Selector机制.
[Products Module] UserCreated event received: UserId: "9a1f2bd0-5b28-210a-9e56-3a18344d310a", UserName: admin
[Products Module] Sending a popular products email to admin@abp.io...
总结
通过引入 Outbox/Inbox
模式, 我们实现了:
事务性的消息发送和接收 可靠的消息处理机制 多数据库环境下的模块化事件处理
ModularCRM 项目不仅实现了可靠的消息处理, 还展示了如何在单体应用中优雅地处理多数据库场景. 项目源码:
https://github.com/abpframework/abp-samples/tree/master/ModularCrm-OutboxInbox-Pattern
参考资料
Outbox/Inbox for transactional events https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed#outbox-inbox-for-transactional-events ConnectionStrings https://abp.io/docs/latest/framework/fundamentals/connection-strings ABP Studio: Single Layer Solution Template https://abp.io/docs/latest/solution-templates/single-layer-web-application