# 前言
上一篇中已经完成了 Ocelot + Consul 的搭建,这篇简单说一下事件总线(EventBus
)。
# 事件总线
什么是事件总线?
事件总线是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到解耦的目的
为什么要使用事件总线?
- 以当前项目举例,假设有一个订单服务,一个产品服务。客户端有一个下单功能,下单时调用订单服务的下单接口,下单接口需要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。服务之间调用可以选择
RestAPI
或者效率更高的gRPC
。可能这两者各有各的使用场景,但是它们都存在服务之间的耦合问题,或者难以做到异步调用- 假设下单调用订单服务,订单服务需要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务等等,如果每个服务处理时间需要2s,不使用异步处理的话,响应时间可想而知。如果使用EventBus的话,那么订单服务只需要向EventBus发一个“下单事件”就可以了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,自己去减库存。这样就避免了两个服务之间直接调用的耦合性,并且真正做到了异步调用
既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并不是微服务独有的问题,而是所有的分布式系统都会存在的问题。关于分布式事务,可以查一下 “CAP原则” 和 “BASE理论” 了解更多。如今分布式系统更多时候会追求事务的最终一致性。
下面使用开源框架 CAP
来演示 EventBus
的基本使用。之所以使用 CAP
是因为它既能解决分布式系统的最终一致性,同时又是一个 EventBus
,它具备EventBus
的所有功能。点击了解更多 (opens new window)。
# CAP
目前 CAP 支持使用 RabbitMQ
,Kafka
,Azure Service Bus
等进行底层之间的消息发送,不需要具备这些消息队列的使用经验就可以轻松的集成到项目中。CAP 目前支持使用 Sql Server
,MySql
,PostgreSql
,MongoDB
数据库的项目。这里选择:消息组件使用 RabbitMq
,数据库存储使用 SqlServer
。
Nuget
安装 :
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Microsoft.EntityFrameworkCore.SqlServer
DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer
2
3
4
5
6
# Product.Api
新增 Product.Api
作为产品服务,代码结构与 Order.Api
结构类似:
# ProductsController.cs
增加减库存接口:
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Product.Api.Models;
using System;
using System.Threading.Tasks;
namespace Product.Api.Controller
{
[Route("[Controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
private readonly IConfiguration configuration;
private readonly ICapPublisher capBus;
private readonly ProductContext context;
public ProductsController(IConfiguration configuration, ICapPublisher capBus, ProductContext context)
{
this.configuration = configuration;
this.capBus = capBus;
this.context = context;
}
[HttpGet]
public IActionResult Index()
{
string result = $"产品服务:{DateTime.Now:yyyy-MM-dd HH:mm:ss},-{Request.HttpContext.Connection.LocalIpAddress}:{configuration["ConsulSetting:ServicePort"]}";
return Ok(result);
}
/// <summary>
/// 减库存 订阅下单事件
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
[NonAction]
[CapSubscribe("order.services.createorder")]
public async Task ReduceStock(CreateOrderMessageDto message)
{
Console.WriteLine("message:" + JsonConvert.SerializeObject(message));
var product = await context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
product.Stock -= message.Count;
await context.SaveChangesAsync();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# CreateOrderMessageDto.cs
namespace Product.Api.Models
{
/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{
/// <summary>
/// 产品ID
/// </summary>
public int ProductID { get; set; }
/// <summary>
/// 购买数量
/// </summary>
public int Count { get; set; }
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Product.cs
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
namespace Product.Api.Models
{
public class Product
{
[Key]
public int ID { get; set; }
/// <summary>
/// 产品名称
/// </summary>
[Required]
[Column(TypeName = "VARCHAR(16)")]
public string Name { get; set; }
/// <summary>
/// 库存
/// </summary>
[Required]
public int Stock { get; set; }
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# ProductContext.cs
using Microsoft.EntityFrameworkCore;
namespace Product.Api
{
public class ProductContext : DbContext
{
public ProductContext(DbContextOptions<ProductContext> options)
: base(options)
{
}
public DbSet<Models.Product> Products { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
//初始化种子数据
modelBuilder.Entity<Models.Product>().HasData(new Models.Product
{
ID = 1,
Name = "ThinkPad",
Stock = 100
},
new Models.Product
{
ID = 2,
Name = "Mac",
Stock = 100
});
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"ConsulSetting": {
"ServiceName": "product.service",
"ServiceIP": "192.168.31.191",
"ServiceHealthCheck": "/healthcheck",
"ConsulAddress": "http://192.168.31.191:8500"
},
"ConnectionString": "Server=192.168.31.210;Database=Microservice.Sample.Product;user id=sa;password=wpl19950815;MultipleActiveResultSets=true"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<ProductContext>(opt => opt.UseSqlServer(Configuration["ConnectionString"]));
services.AddCap(x =>
{
x.UseEntityFramework<ProductContext>().UseRabbitMQ(option =>
{
option.HostName = "192.168.31.191";
option.UserName = "guest";
option.Password = "guest";
});
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Order.Api
# OrdersController.cs
增加下单接口:
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Order.Api.Models;
using System;
using System.Threading.Tasks;
namespace Order.Api.Controller
{
[Route("[Controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
private readonly IConfiguration configuration;
private readonly ICapPublisher capBus;
private readonly OrderContext context;
public OrdersController(IConfiguration configuration, ICapPublisher capBus, OrderContext context)
{
this.configuration = configuration;
this.capBus = capBus;
this.context = context;
}
[HttpGet]
public IActionResult Index()
{
string result = $"订单服务:{DateTime.Now:yyyy-MM-dd HH:mm:ss},-{Request.HttpContext.Connection.LocalIpAddress}:{configuration["ConsulSetting:ServicePort"]}";
return Ok(result);
}
/// <summary>
/// 创建订单
/// </summary>
/// <param name="order"></param>
/// <returns></returns>
[Route("Create")]
[HttpPost]
public async Task<IActionResult> CreateOrder(Models.Order order)
{
using (var trans = context.Database.BeginTransaction(capBus, autoCommit: true))
{
order.CreateTime = DateTime.Now;
context.Orders.Add(order);
var result = await context.SaveChangesAsync() > 0;
if (result)
{
// 发布下单事件
await capBus.PublishAsync("order.services.createorder",
new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });
return Ok();
}
return BadRequest();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# CreateOrderMessageDto.cs
namespace Order.Api.Models
{
/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{
/// <summary>
/// 产品ID
/// </summary>
public int ProductID { get; set; }
/// <summary>
/// 购买数量
/// </summary>
public int Count { get; set; }
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Order.cs
using System;
using System.ComponentModel.DataAnnotations;
namespace Order.Api.Models
{
public class Order
{
[Key]
public int ID { get; set; }
/// <summary>
/// 下单时间
/// </summary>
[Required]
public DateTime CreateTime { get; set; }
/// <summary>
/// 产品ID
/// </summary>
[Required]
public int ProductID { get; set; }
/// <summary>
/// 购买数量
/// </summary>
[Required]
public int Count { get; set; }
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# OrderContext.cs
using Microsoft.EntityFrameworkCore;
namespace Order.Api
{
public class OrderContext : DbContext
{
public OrderContext(DbContextOptions<OrderContext> options)
: base(options)
{
}
public DbSet<Models.Order> Orders { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"ConsulSetting": {
"ServiceName": "order.service",
"ServiceIP": "192.168.31.191",
"ServiceHealthCheck": "/healthcheck",
"ConsulAddress": "http://192.168.31.191:8500"
},
"ConnectionString": "Server=192.168.31.210;Database=Microservice.Sample.Order;user id=sa;password=wpl19950815;MultipleActiveResultSets=true"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<OrderContext>(opt => opt.UseSqlServer(Configuration["ConnectionString"]));
services.AddCap(x =>
{
x.UseEntityFramework<OrderContext>().UseRabbitMQ(option =>
{
option.HostName = "192.168.31.191";
option.UserName = "guest";
option.Password = "guest";
});
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
以上就是产品服务的新增以及订单服务的部分代码调整,功能很简单:各自添加自己的数据库表,订单服务增加下单接口,下单接口会发出“下单事件”。产品服务增加减库存接口,减库存接口会订阅“下单事件”。然后客户端调用下单接口下单时,产品服务会减去相应的库存。关于EF数据库迁移之类的基本使用不做介绍。
# 重新构建镜像
[root@centos-01 dotnetcore_src]# cd order.api.release/
[root@centos-01 order.api.release]# docker build -t order.api .
Sending build context to Docker daemon 16.05MB
Step 1/4 : FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
---> a2be3e478ffa
Step 2/4 : WORKDIR /app
---> Using cache
---> 9f551bd1698a
Step 3/4 : COPY . /app
---> e19ab440e8a5
Step 4/4 : ENTRYPOINT ["dotnet", "Order.Api.dll"]
---> Running in 3d1e4110f02e
Removing intermediate container 3d1e4110f02e
---> 06322a6c6e83
Successfully built 06322a6c6e83
Successfully tagged order.api:latest
[root@centos-01 order.api.release]# cd ../product.api.release/
[root@centos-01 product.api.release]# docker build -t product.api .
Sending build context to Docker daemon 16.38MB
Step 1/4 : FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
---> a2be3e478ffa
Step 2/4 : WORKDIR /app
---> Using cache
---> 9f551bd1698a
Step 3/4 : COPY . /app
---> 6f6d08e02d78
Step 4/4 : ENTRYPOINT ["dotnet", "Product.Api.dll"]
---> Running in 7616a505741e
Removing intermediate container 7616a505741e
---> 6be08521c6fe
Successfully built 6be08521c6fe
Successfully tagged product.api:latest
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
运行订单服务,产品服务:
docker run -d --name order.api -p 80:80 order.api --ConsulSetting:ServicePort="80"
docker run -d --name order.api1 -p 81:80 order.api --ConsulSetting:ServicePort="81"
docker run -d --name order.api2 -p 82:80 order.api --ConsulSetting:ServicePort="82"
docker run -d --name product.api -p 85:80 product.api --ConsulSetting:ServicePort="85"
docker run -d --name product.api1 -p 86:80 product.api --ConsulSetting:ServicePort="86"
docker run -d --name product.api2 -p 87:80 product.api --ConsulSetting:ServicePort="87"
2
3
4
5
6
ocelot.json
增加路由配置:
{
"Routes": [
{
// 路由规则匹配
"DownstreamPathTemplate": "/orders/{url}",
"DownstreamScheme": "http",
"UpstreamPathTemplate": "/orders/{url}",
// 增加Post请求
"UpstreamHttpMethod": [ "Get", "Post" ],
"ServiceName": "order.service",
"LoadBalancerOptions": {
"Type": "RoundRobin"
},
// 缓存
"FileCacheOptions": {
"TtlSeconds": 5,
"Region": "regionname"
},
// 限流
"RateLimitOptions": {
"ClientWhitelist": [ "SuperClient" ],
"EnableRateLimiting": true,
"Period": "2s",
"PeriodTimespan": 2,
"Limit": 1
},
// 超时熔断
"QoSOptions": {
"ExceptionsAllowedBeforeBreaking": 3,
"DurationOfBreak": 10000,
"TimeoutValue": 5000
}
},
{
"DownstreamPathTemplate": "/products",
"DownstreamScheme": "http",
"UpstreamPathTemplate": "/products",
"UpstreamHttpMethod": [ "Get" ],
"ServiceName": "product.service",
"LoadBalancerOptions": {
"Type": "RoundRobin"
},
// 缓存
"FileCacheOptions": {
"TtlSeconds": 5,
"Region": "regionname"
},
// 限流
"RateLimitOptions": {
"ClientWhitelist": [ "SuperClient" ],
"EnableRateLimiting": true,
"Period": "2s",
"PeriodTimespan": 2,
"Limit": 1
},
// 超时熔断
"QoSOptions": {
"ExceptionsAllowedBeforeBreaking": 3,
"DurationOfBreak": 10000,
"TimeoutValue": 5000
}
}
],
"GlobalConfiguration": {
"BaseUrl": "http://localhost:5000",
"ServiceDiscoveryProvider": {
"Scheme": "http",
"Host": "192.168.31.191",
"Port": 8500,
"Type": "Consul"
},
"RateLimitOptions": {
"DisableRateLimitHeaders": false,
"QuotaExceededMessage": "too many requests...",
"HttpStatusCode": 999,
"ClientIdHeader": "Test"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
至此整个环境就有点复杂了。要确保 SqlServer,RabbitMQ,Consul,服务实例、Gateway都正常运行:
cap.published
表和 cap.received
表由 CAP
自动生成,内部使用本地消息表+MQ来实现异步确保。
# 测试
使用Postman作为客户端调用下单接口(5000是Ocelot网关端口):
订单库:
产品库:
至此虽然功能很简单,但是实现了服务的解耦,异步调用,和最终一致性。要注意的是:
- 这里的事务是指:订单持久化到数据库/和下单事件保存到
cap.published
表(保存到cap.published
表理论上代表消息正常发送到MQ),要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的 - 产品服务的减库存是否成功那是产品服务的事,理论上也应该是成功的。因为消息已经确保发到了MQ,产品服务必然会收到消息。CAP也提供了失败重试,和失败回调机制,要理解 “CAP 是基于MQ加本地消息表来实现异步确保”
- 如果下单成功但是库存不足导致减库存失败了怎么办,是否需要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。(高并发下保证不超卖是另一个问题这里不考虑)如果非要数据回滚也是能实现的,CAP的
ICapPublisher.Publish
方法提供一个callbackName
参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,但不推荐 - CAP无法保证消息不重复,实际使用中需要自己考虑一下实现消息的重复过滤和幂等
← 微服务之网关 Consul服务注册发现 →