# 前言

上一篇中已经完成了 Ocelot + Consul 的搭建,这篇简单说一下事件总线(EventBus)。

# 事件总线

什么是事件总线?

事件总线是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到解耦的目的

为什么要使用事件总线?

  1. 以当前项目举例,假设有一个订单服务,一个产品服务。客户端有一个下单功能,下单时调用订单服务的下单接口,下单接口需要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。服务之间调用可以选择 RestAPI 或者效率更高的 gRPC。可能这两者各有各的使用场景,但是它们都存在服务之间的耦合问题,或者难以做到异步调用
  2. 假设下单调用订单服务,订单服务需要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务等等,如果每个服务处理时间需要2s,不使用异步处理的话,响应时间可想而知。如果使用EventBus的话,那么订单服务只需要向EventBus发一个“下单事件”就可以了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,自己去减库存。这样就避免了两个服务之间直接调用的耦合性,并且真正做到了异步调用

既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并不是微服务独有的问题,而是所有的分布式系统都会存在的问题。关于分布式事务,可以查一下 “CAP原则” 和 “BASE理论” 了解更多。如今分布式系统更多时候会追求事务的最终一致性。

下面使用开源框架 CAP来演示 EventBus 的基本使用。之所以使用 CAP 是因为它既能解决分布式系统的最终一致性,同时又是一个 EventBus,它具备EventBus 的所有功能。点击了解更多 (opens new window)

# CAP

目前 CAP 支持使用 RabbitMQKafkaAzure Service Bus 等进行底层之间的消息发送,不需要具备这些消息队列的使用经验就可以轻松的集成到项目中。CAP 目前支持使用 Sql ServerMySqlPostgreSqlMongoDB 数据库的项目。这里选择:消息组件使用 RabbitMq,数据库存储使用 SqlServer

Nuget 安装 :

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Microsoft.EntityFrameworkCore.SqlServer
DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer
1
2
3
4
5
6

# Product.Api

新增 Product.Api 作为产品服务,代码结构与 Order.Api 结构类似:

img

# ProductsController.cs

增加减库存接口:

using DotNetCore.CAP;

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;

using Newtonsoft.Json;

using Product.Api.Models;

using System;
using System.Threading.Tasks;

namespace Product.Api.Controller
{
    [Route("[Controller]")]
    [ApiController]
    public class ProductsController : ControllerBase
    {
        private readonly IConfiguration configuration;
        private readonly ICapPublisher capBus;
        private readonly ProductContext context;

        public ProductsController(IConfiguration configuration, ICapPublisher capBus, ProductContext context)
        {
            this.configuration = configuration;
            this.capBus = capBus;
            this.context = context;
        }

        [HttpGet]
        public IActionResult Index()
        {
            string result = $"产品服务:{DateTime.Now:yyyy-MM-dd HH:mm:ss},-{Request.HttpContext.Connection.LocalIpAddress}:{configuration["ConsulSetting:ServicePort"]}";
            return Ok(result);
        }

        /// <summary>
        /// 减库存 订阅下单事件
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        [NonAction]
        [CapSubscribe("order.services.createorder")]
        public async Task ReduceStock(CreateOrderMessageDto message)
        {
            Console.WriteLine("message:" + JsonConvert.SerializeObject(message));
            var product = await context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
            product.Stock -= message.Count;
            await context.SaveChangesAsync();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# CreateOrderMessageDto.cs

namespace Product.Api.Models
{
    /// <summary>
    /// 下单事件消息
    /// </summary>
    public class CreateOrderMessageDto
    {
        /// <summary>
        /// 产品ID
        /// </summary>
        public int ProductID { get; set; }

        /// <summary>
        /// 购买数量
        /// </summary>
        public int Count { get; set; }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# Product.cs

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Product.Api.Models
{
    public class Product
    {
        [Key]
        public int ID { get; set; }

        /// <summary>
        /// 产品名称
        /// </summary>
        [Required]
        [Column(TypeName = "VARCHAR(16)")]
        public string Name { get; set; }

        /// <summary>
        /// 库存
        /// </summary>
        [Required]
        public int Stock { get; set; }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# ProductContext.cs

using Microsoft.EntityFrameworkCore;

namespace Product.Api
{
    public class ProductContext : DbContext
    {
        public ProductContext(DbContextOptions<ProductContext> options)
           : base(options)
        {

        }

        public DbSet<Models.Product> Products { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            //初始化种子数据
            modelBuilder.Entity<Models.Product>().HasData(new Models.Product
            {
                ID = 1,
                Name = "ThinkPad",
                Stock = 100
            },
            new Models.Product
            {
                ID = 2,
                Name = "Mac",
                Stock = 100
            });
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConsulSetting": {
    "ServiceName": "product.service",
    "ServiceIP": "192.168.31.191",
    "ServiceHealthCheck": "/healthcheck",
    "ConsulAddress": "http://192.168.31.191:8500"
  },
  "ConnectionString": "Server=192.168.31.210;Database=Microservice.Sample.Product;user id=sa;password=wpl19950815;MultipleActiveResultSets=true"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<ProductContext>(opt => opt.UseSqlServer(Configuration["ConnectionString"]));

    services.AddCap(x =>
    {
        x.UseEntityFramework<ProductContext>().UseRabbitMQ(option =>
        {
            option.HostName = "192.168.31.191";
            option.UserName = "guest";
            option.Password = "guest";
        });
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Order.Api

# OrdersController.cs

增加下单接口:

using DotNetCore.CAP;

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;

using Order.Api.Models;

using System;
using System.Threading.Tasks;

namespace Order.Api.Controller
{
    [Route("[Controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        private readonly IConfiguration configuration;
        private readonly ICapPublisher capBus;
        private readonly OrderContext context;

        public OrdersController(IConfiguration configuration, ICapPublisher capBus, OrderContext context)
        {
            this.configuration = configuration;
            this.capBus = capBus;
            this.context = context;
        }

        [HttpGet]
        public IActionResult Index()
        {
            string result = $"订单服务:{DateTime.Now:yyyy-MM-dd HH:mm:ss},-{Request.HttpContext.Connection.LocalIpAddress}:{configuration["ConsulSetting:ServicePort"]}";
            return Ok(result);
        }

        /// <summary>
        /// 创建订单
        /// </summary>
        /// <param name="order"></param>
        /// <returns></returns>
        [Route("Create")]
        [HttpPost]
        public async Task<IActionResult> CreateOrder(Models.Order order)
        {
            using (var trans = context.Database.BeginTransaction(capBus, autoCommit: true))
            {
                order.CreateTime = DateTime.Now;
                context.Orders.Add(order);

                var result = await context.SaveChangesAsync() > 0;

                if (result)
                {
                    // 发布下单事件
                    await capBus.PublishAsync("order.services.createorder",
                        new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });
                    return Ok();
                }
                return BadRequest();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

# CreateOrderMessageDto.cs

namespace Order.Api.Models
{
    /// <summary>
    /// 下单事件消息
    /// </summary>
    public class CreateOrderMessageDto
    {
        /// <summary>
        /// 产品ID
        /// </summary>
        public int ProductID { get; set; }

        /// <summary>
        /// 购买数量
        /// </summary>
        public int Count { get; set; }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# Order.cs

using System;
using System.ComponentModel.DataAnnotations;

namespace Order.Api.Models
{
    public class Order
    {
        [Key]
        public int ID { get; set; }

        /// <summary>
        /// 下单时间
        /// </summary>
        [Required]
        public DateTime CreateTime { get; set; }

        /// <summary>
        /// 产品ID
        /// </summary>
        [Required]
        public int ProductID { get; set; }

        /// <summary>
        /// 购买数量
        /// </summary>
        [Required]
        public int Count { get; set; }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# OrderContext.cs

using Microsoft.EntityFrameworkCore;

namespace Order.Api
{
    public class OrderContext : DbContext
    {
        public OrderContext(DbContextOptions<OrderContext> options)
           : base(options)
        {

        }

        public DbSet<Models.Order> Orders { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {

        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConsulSetting": {
    "ServiceName": "order.service",
    "ServiceIP": "192.168.31.191",
    "ServiceHealthCheck": "/healthcheck",
    "ConsulAddress": "http://192.168.31.191:8500"
  },
  "ConnectionString": "Server=192.168.31.210;Database=Microservice.Sample.Order;user id=sa;password=wpl19950815;MultipleActiveResultSets=true"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<OrderContext>(opt => opt.UseSqlServer(Configuration["ConnectionString"]));

    services.AddCap(x =>
    {
        x.UseEntityFramework<OrderContext>().UseRabbitMQ(option =>
        {
            option.HostName = "192.168.31.191";
            option.UserName = "guest";
            option.Password = "guest";
        });
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

img

以上就是产品服务的新增以及订单服务的部分代码调整,功能很简单:各自添加自己的数据库表,订单服务增加下单接口,下单接口会发出“下单事件”。产品服务增加减库存接口,减库存接口会订阅“下单事件”。然后客户端调用下单接口下单时,产品服务会减去相应的库存。关于EF数据库迁移之类的基本使用不做介绍。

# 重新构建镜像

[root@centos-01 dotnetcore_src]# cd order.api.release/
[root@centos-01 order.api.release]# docker build -t order.api .
Sending build context to Docker daemon  16.05MB
Step 1/4 : FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
 ---> a2be3e478ffa
Step 2/4 : WORKDIR /app
 ---> Using cache
 ---> 9f551bd1698a
Step 3/4 : COPY . /app
 ---> e19ab440e8a5
Step 4/4 : ENTRYPOINT ["dotnet", "Order.Api.dll"]
 ---> Running in 3d1e4110f02e
Removing intermediate container 3d1e4110f02e
 ---> 06322a6c6e83
Successfully built 06322a6c6e83
Successfully tagged order.api:latest

[root@centos-01 order.api.release]# cd ../product.api.release/
[root@centos-01 product.api.release]# docker build -t product.api .
Sending build context to Docker daemon  16.38MB
Step 1/4 : FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
 ---> a2be3e478ffa
Step 2/4 : WORKDIR /app
 ---> Using cache
 ---> 9f551bd1698a
Step 3/4 : COPY . /app
 ---> 6f6d08e02d78
Step 4/4 : ENTRYPOINT ["dotnet", "Product.Api.dll"]
 ---> Running in 7616a505741e
Removing intermediate container 7616a505741e
 ---> 6be08521c6fe
Successfully built 6be08521c6fe
Successfully tagged product.api:latest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

运行订单服务,产品服务:

docker run -d --name order.api -p 80:80 order.api  --ConsulSetting:ServicePort="80"
docker run -d --name order.api1 -p 81:80 order.api --ConsulSetting:ServicePort="81"
docker run -d --name order.api2 -p 82:80 order.api --ConsulSetting:ServicePort="82"
docker run -d --name product.api -p 85:80 product.api --ConsulSetting:ServicePort="85"
docker run -d --name product.api1 -p 86:80 product.api --ConsulSetting:ServicePort="86"
docker run -d --name product.api2 -p 87:80 product.api --ConsulSetting:ServicePort="87"
1
2
3
4
5
6

img

ocelot.json 增加路由配置:

{
  "Routes": [
    {
      // 路由规则匹配
      "DownstreamPathTemplate": "/orders/{url}",
      "DownstreamScheme": "http",
      "UpstreamPathTemplate": "/orders/{url}",
      // 增加Post请求
      "UpstreamHttpMethod": [ "Get", "Post" ],
      "ServiceName": "order.service",
      "LoadBalancerOptions": {
        "Type": "RoundRobin"
      },
      // 缓存
      "FileCacheOptions": {
        "TtlSeconds": 5,
        "Region": "regionname"
      },
      // 限流
      "RateLimitOptions": {
        "ClientWhitelist": [ "SuperClient" ],
        "EnableRateLimiting": true,
        "Period": "2s",
        "PeriodTimespan": 2,
        "Limit": 1
      },
      // 超时熔断
      "QoSOptions": {
        "ExceptionsAllowedBeforeBreaking": 3,
        "DurationOfBreak": 10000,
        "TimeoutValue": 5000
      }
    },
    {
      "DownstreamPathTemplate": "/products",
      "DownstreamScheme": "http",
      "UpstreamPathTemplate": "/products",
      "UpstreamHttpMethod": [ "Get" ],
      "ServiceName": "product.service",
      "LoadBalancerOptions": {
        "Type": "RoundRobin"
      },
      // 缓存
      "FileCacheOptions": {
        "TtlSeconds": 5,
        "Region": "regionname"
      },
      // 限流
      "RateLimitOptions": {
        "ClientWhitelist": [ "SuperClient" ],
        "EnableRateLimiting": true,
        "Period": "2s",
        "PeriodTimespan": 2,
        "Limit": 1
      },
      // 超时熔断
      "QoSOptions": {
        "ExceptionsAllowedBeforeBreaking": 3,
        "DurationOfBreak": 10000,
        "TimeoutValue": 5000
      }
    }
  ],
  "GlobalConfiguration": {
    "BaseUrl": "http://localhost:5000",
    "ServiceDiscoveryProvider": {
      "Scheme": "http",
      "Host": "192.168.31.191",
      "Port": 8500,
      "Type": "Consul"
    },
    "RateLimitOptions": {
      "DisableRateLimitHeaders": false,
      "QuotaExceededMessage": "too many requests...",
      "HttpStatusCode": 999,
      "ClientIdHeader": "Test"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

至此整个环境就有点复杂了。要确保 SqlServer,RabbitMQ,Consul,服务实例、Gateway都正常运行:

img

img

img

cap.published 表和 cap.received 表由 CAP 自动生成,内部使用本地消息表+MQ来实现异步确保。

# 测试

使用Postman作为客户端调用下单接口(5000是Ocelot网关端口):

img

订单库: img

产品库: img

至此虽然功能很简单,但是实现了服务的解耦,异步调用,和最终一致性。要注意的是:

  1. 这里的事务是指:订单持久化到数据库/和下单事件保存到 cap.published表(保存到 cap.published 表理论上代表消息正常发送到MQ),要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的
  2. 产品服务的减库存是否成功那是产品服务的事,理论上也应该是成功的。因为消息已经确保发到了MQ,产品服务必然会收到消息。CAP也提供了失败重试,和失败回调机制,要理解 “CAP 是基于MQ加本地消息表来实现异步确保”
  3. 如果下单成功但是库存不足导致减库存失败了怎么办,是否需要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。(高并发下保证不超卖是另一个问题这里不考虑)如果非要数据回滚也是能实现的,CAP的 ICapPublisher.Publish 方法提供一个callbackName 参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,但不推荐
  4. CAP无法保证消息不重复,实际使用中需要自己考虑一下实现消息的重复过滤和幂等