springcloud 整合 shardingsphere 及 seata

2023-06-06
4 min read

springcloud 整合 shardingsphere 及 seata, 为分布式事务的场景提供了十分优雅的解决方案

垂直 vs 水平 拆分

垂直分库,把单一数据库按业务进行划分,做到专库专表
垂直分表,把表中的一部分数据列存储到一张表,再将另外一部分数据列存储到另外一张或多张表,这种方式叫垂直分表
垂直分,都是切分表头,更多考虑微服务划分和数据库设计。

水平分库,表的数据一部分存储在一个数据库,另一部分数据存储在另外一个数据库中
水平分表,表的数据一部分存储在一个表,另一部分数据存储在另外一个表中
水平分,都是切表体

分库分表带来的问题:

  1. 跨节点连接查询问题,分页、排序
  2. 多数据源管理问题,

springcloud 整合 shardingsphere 及 seata

依赖

依赖我们选择引入、spring-cloud-starter-alibaba-seata 作为 seata 的起步依赖、sharding-jdbc-spring-boot-starter 作为 shardingshpere 分库分表的核心依赖、sharding-transaction-base-seata-at 作为 shardingsphere 支持 seata-at 模式的依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>${depend on you springcloud version}</version>
</dependency>
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-transaction-base-seata-at</artifactId>
    <version>4.1.1</version>
</dependency>

数据库环境

模拟分库(最常见的方案),模拟将数据分库存储至 2 个不同的库,首先创建好 2 个数据库 cube_ld_archetype_dev_1、cube_ld_archetype_dev_1,并且创建相同的表 t_ld_daily_user_d。

delimiter ;
CREATE TABLE cube_ld_archetype_dev_1.t_ld_daily_user_d (
     `id` bigint NOT NULL,
     `name` varchar(16) NOT NULL COMMENT '姓名',
     `dd_union_id` varchar(32) DEFAULT NULL COMMENT '钉钉 id',
     `gender` tinyint NOT NULL COMMENT '性别,1 男 2 女',
     `mobile` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
     `phone` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
     `create_time`  datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `update_time`  datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     `modifier`     bigint   NOT NULL DEFAULT '-1' COMMENT '修改人',
     `creator`      bigint   NOT NULL DEFAULT '-1' COMMENT '创建人',
     `tenant_id`    bigint   NOT NULL DEFAULT '-1' COMMENT '租户',
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户表';

CREATE TABLE cube_ld_archetype_dev_2.t_ld_daily_user_d (
     `id` bigint NOT NULL,
     `name` varchar(16) NOT NULL COMMENT '姓名',
     `dd_union_id` varchar(32) DEFAULT NULL COMMENT '钉 钉 id',
     `gender` tinyint NOT NULL COMMENT '性别,1 男 2 女',
     `mobile` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
     `phone` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
     `create_time`  datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `update_time`  datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     `modifier`     bigint   NOT NULL DEFAULT '-1' COMMENT '修改人',
     `creator`      bigint   NOT NULL DEFAULT '-1' COMMENT '创建人',
     `tenant_id`    bigint   NOT NULL DEFAULT '-1' COMMENT '租户',
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户表';

由于使用了 seata 的 at 模式,因此需要在每个库创建 undo_log 表

delimiter ;
CREATE TABLE IF NOT EXISTS cube_ld_archetype_dev_1.`undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE cube_ld_archetype_dev_1.`undo_log` ADD INDEX `ix_log_created` (`log_created`);

CREATE TABLE IF NOT EXISTS cube_ld_archetype_dev_2.`undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE cube_ld_archetype_dev_2.`undo_log` ADD INDEX `ix_log_created` (`log_created`);

编写配置

由于使用了 springcloud-seata 和 springcloud-shardingsphere 的依赖,因此可以直接在注册中心上进行配置;
可以使用 2 个配置文件来进行 seata 和 shardingsphere 分开配置;
sharingsphere 配置使用 tenant_id 来进行分库;

seata 配置, 此配置可在 nacos 配置中心完成

seata:
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: SEATA_GROUP
      namespace: ${spring.cloud.nacos.config.namespace}
      username: ${nacos.username}
      password: ${nacos.password}
      cluster: default
      data-id: seataServer.properties
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      group: SEATA_GROUP
      namespace: ${spring.cloud.nacos.discovery.namespace}
      username: ${nacos.username}
      password: ${nacos.password}
      application: seata-server
      cluster: default
  tx-service-group: default_tx_group
  service:
    vgroup-mapping:
      default_tx_group: default
    enable-auto-data-source-proxy: true
  data-source-proxy-mode: AT

shardingsphere 配置,此配置可在 nacos 配置中心完成

spring:
  jackson:
    time-zone: Asia/Shanghai
  cache:
    type: none
  shardingsphere:
    dataSource:
      names: ds1,ds2
      ds1:
        type: com.zaxxer.hikari.HikariDataSource
        driverClassName: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://dsip1:3306/cube_ld_archetype_dev_1?useUnicode=true&characterEncoding=UTF-8&
          serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&
          autoReconnect=true&failOverReadOnly=false&maxReconnects=2&useSSL=false
        username: root
        password: xxx
      ds2:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://dsip2:3306/cube_ld_archetype_dev_2?useUnicode=true&characterEncoding=UTF-8&
          serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&
          autoReconnect=true&failOverReadOnly=false&maxReconnects=2&useSSL=false
        username: root
        password: xxx
    sharding:
      tables:
        t_ld_daily_user_d:
          database-strategy:
            inline:
              sharding-column: tenant_id
              algorithm-expression: ds$->{tenant_id % 2 + 1}

seata.conf 这一步是 shardingsphere 与 seata 整合的配置,此配置需要在项目的 resource 目录下,目前不是很优雅,但是要实现在 nacos 上配置也不难。

sharding.transaction.seata.at.enable=true
client.application.id=archetype
client.transaction.service.group=default_tx_group

编写代码

注意:@Transactional 和 @ShardingTransactionType 注解必须同时添加才能使分布式事务生效 TransactionType.BASE 其实就是 shardingsphere 对于弱一致性事务的定义,在这里可理解 TransactionType.BASE 是 seata at 事务的映射。 当然除了弱一致性以外,还有强一致性 XA 事务,seata 默认也是支持的,只是 XA 事务依赖数据库支持,在使用 XA 事务时你需要提前确认数据库是否支持 XA 事务。
在下面的代码中,程序有一个除零错误,故意设计一个错误,我们期望分布式事务生效的情况下,能回滚 userMapper.insert(user1) 这行代码对数据库的操作。

/**
 * save mutiple record
 * testing distributed transaction
 */
@PostMapping("/saveTestShardingSphere")
@Transactional(rollbackFor = Exception.class)
@ShardingTransactionType(TransactionType.BASE)
public Result<Long> saveTestShardingSphere() {
    User user1 = new User();
    user1.setId(1L);
    user1.setTenantId(1L);
    user1.setName("van1");
    user1.setGender(true);
    user1.setPhone("7789");
    user1.setDdUnionId("7789");
    userMapper.insert(user1);

    User user2 = new User();
    user2.setId(2L);
    user2.setTenantId(2L);
    user2.setName("van2");
    user2.setGender(true);
    user2.setPhone("7789");
    user2.setDdUnionId("7789");
    userMapper.insert(user2);

    int errorInt = 1 / 0;

    return Result.success();
}

启动 seata-server

seata 的 AT 事务要求 TC 与 RM 能双向通讯,因此在做测试的时候,常常需要在本地启动一个 seata-server(TC)以确保事务生效。

cd $seata-server-home/bin
sh seata-server.sh -p 8091 -h 172.20.10.2 -m file
tail -f ../logs/start.out

启动 springcloud 项目

mvn clean -T 1C install -f ../pom.xml -Dmaven.test.skrp=true -U
java \
-Xms1024m -Xmx1024m \
-Dspring.application.name=archetype \
-Dspring.cloud.nacos.config.file-extension=yml \
-Duser.timezone=GMT+08 \
-Dserver.port=8080 \
-Dspring.cloud.nacos.discovery.server-addr=nacos:8848 \
-Dspring.cloud.nacos.discovery.namespace=e29572d7-7ccc-4a18-81da-dbe891677336 \
-Dspring.cloud.nacos.config.server-addr=nacos:8848 \
-Dspring.cloud.nacos.config.namespace=e29572d7-7ccc-4a18-81da-dbe891677336 \
-Dspring.config.import[0]=nacos:seata-at \
-Dspring.config.import[1]=nacos:archetype-shardingjdbcDb \
-Dspring.cloud.nacos.discovery.register-enabled=true \
-Dlogging.level.com.baomidou.mybatisplus=DEBUG \
-Dmybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl \
-jar ./target/archetype.jar

接口测试

发起请求测试

POST http://localhost:8080/user/saveTestShardingSphere

数据库查看

delimiter ;
-- delete from cube_ld_archetype_dev_1.t_ld_daily_user_d;
-- delete from cube_ld_archetype_dev_2.t_ld_daily_user_d;
SELECT * FROM cube_ld_archetype_dev_1.t_ld_daily_user_d;
SELECT * FROM cube_ld_archetype_dev_2.t_ld_daily_user_d;

日志

Branch Rollbacked result: PhaseTwo_Rollbacked

总结

shardingsphere 与 seata 的整合的确做到了配置清晰,使用简单;
强强联合,为分布式事务的场景提供了十分优雅的解决方案;