正文部分

免费第一页香蕉碰碰人人视频网站 一个基于 RabbitMQ 的可复用的分布式事务消息架构方案!

上图仅仅演示了事务正确提交的场景(不包含异常的场景)。这里可以明确知道,事务同步器TransactionSynchronization的afterCommit和afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit之后回调,因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的伪代码如下:

引入核心依赖:

privateLong id; privateLocalDateTime createTime; privateLocalDateTime editTime; privateStringcreator; privateStringeditor; privateInteger deleted; privateInteger currentRetryTimes; privateInteger maxRetryTimes; privateStringqueueName; privateStringexchangeName; privateStringexchangeType; privateStringroutingKey; privateStringbusinessModule; privateStringbusinessKey; privateLocalDateTime nextScheduleTime; privateInteger messageStatus; privateLong initBackoff; privateInteger backoffFactor; }

消息记录状态和内容持久化的管理统一放在TransactionalMessageManagementService中:

DIRECT( "direct"),

ExchangeType exchangeType;

方案代码设计

事务消息模块主要涉及两张表,以MySQL为例,建表DDL如下:

一般而言,一个系统的吞吐量和系统的异步化处理占比成正相关(这一点可以参考Amdahl's Law),所以在系统架构设计实际中应该尽可能使用异步交互,提高系统吞吐量同时减少同步阻塞带来的无谓等待。事务消息模块可以扩展出一个后台管理,甚至可以配合Micrometer、Prometheus和Grafana体系做实时数据监控。

然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):

作者:黄哥

一般情况下,如果服务通过配置自行提前声明队列和交换器的绑定关系,那么发送RabbitMQ消息的时候其实只依赖于exchangeName和routingKey两个字段(header类型的交换器是特殊的,也比较少用,这里暂时不用考虑),考虑到服务可能会遗漏声明操作免费第一页香蕉碰碰人人视频网站,发送消息的时候会基于队列进行首次绑定声明并且缓存相关的信息(RabbitMQ中的队列-交换器绑定声明只要每次声明绑定关系的参数一致免费第一页香蕉碰碰人人视频网站,则不会抛出异常)。

voidsendTransactionalMessage(Destination destination, TxMessage message); }

模拟订单数据成功保存免费第一页香蕉碰碰人人视频网站,而且RabbitMQ消息在事务成功提交后正常发送到RabbitMQ服务端中,如RabbitMQ控制台数据所示。

事务同步器接口TransactionSynchronization的afterCommit方法的实现中,推送对应的消息到RabbitMQ,然后更变事务消息记录的状态为推送成功。

privatevoidmarkSuccess(TransactionalMessage record){ // 标记下一次执行时间为最大值record.setNextScheduleTime(END);record.setCurrentRetryTimes(record.getCurrentRetryTimes.compareTo(record.getMaxRetryTimes) >= 0? record.getMaxRetryTimes : record.getCurrentRetryTimes 1); record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus);record.setEditTime(LocalDateTime.now);messageDao.updateStatusSelective(record);}

事务同步器接口TransactionSynchronization的afterCommit方法的实现中,推送对应的消息到RabbitMQ,然后更变事务消息记录的状态为推送成功。

List<TransactionalMessageContent> queryByMessageIds( String messageIds) ; }

接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是本地消息表),用于保存每一条需要发送的消息记录。事务消息功能模块的主要功能是:

/*** 推送补偿 - 里面的参数应该根据实际场景定制*/publicvoidprocessPendingCompensationRecords{ // 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了LocalDateTime max = LocalDateTime.now.plusSeconds(-DEFAULT_INIT_BACKOFF);// 时间的左值为右值减去1小时LocalDateTime min = max.plusHours(- 1); Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT).stream.collect(Collectors.toMap(TransactionalMessage::getId, x -> x));if(!collect.isEmpty) { StringJoiner joiner = newStringJoiner( ",", "(", ")"); collect.keySet.forEach(x -> joiner.add(x.toString));contentDao.queryByMessageIds(joiner.toString).forEach(item -> {TransactionalMessage message = collect.get(item.getMessageId);sendMessageSync(message, item.getContent);});}}}

小结

/*** 计算下一次执行时间** @parambase 基础时间 * @paraminitBackoff 退避基准值 * @parambackoffFactor 退避指数 * @paramround 轮数 * @returnLocalDateTime */privateLocalDateTime calculateNextScheduleTime(LocalDateTime base,longinitBackoff, longbackoffFactor, longround) { doubledelta = initBackoff * Math.pow(backoffFactor, round); returnbase.plusSeconds(( long) delta); }

在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务。

StringqueueName;

privatevoidmarkFail(TransactionalMessage record, Exception e){ log.error( "发送消息失败,目标队列:{}", record.getQueueName, e); record.setCurrentRetryTimes(record.getCurrentRetryTimes.compareTo(record.getMaxRetryTimes) >= 0? record.getMaxRetryTimes : record.getCurrentRetryTimes 1); // 计算下一次的执行时间LocalDateTime nextScheduleTime = calculateNextScheduleTime(record.getNextScheduleTime,record.getInitBackoff,record.getBackoffFactor,record.getCurrentRetryTimes);record.setNextScheduleTime(nextScheduleTime);record.setMessageStatus(TxMessageStatus.FAIL.getStatus);record.setEditTime(LocalDateTime.now);messageDao.updateStatusSelective(record);}

为了统一处理补偿推送的问题,使用了有限状态判断消息是否已经推送成功:

@Overridepublicvoidrun(String... args)throwsException { mockBusinessService.saveOrder;}}

最后添加两个测试类:

分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。环境依赖如下:

privatefinalJdbcTemplate jdbcTemplate; privatefinalTransactionalMessageService transactionalMessageService; privatefinalObjectMapper objectMapper;

简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游义务已经结束。

CREATETABLE`t_transactional_message_content`(idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARY KEY, message_id BIGINTUNSIGNEDNOTNULLCOMMENT'事务消息记录ID', contentTEXTCOMMENT'消息内容') COMMENT'事务消息内容表';

上面伪代码中,保存事务消息和注册事务同步器两个步骤可以安插在事务方法中的任意位置,也就是说与执行顺序无关。

voidinsert( TransactionalMessageContent record) ;

FANOUT( "fanout"),

@OverridepublicvoidsendTransactionalMessage(Destination destination, TxMessage message){ String queueName = destination.queueName;String exchangeName = destination.exchangeName;String routingKey = destination.routingKey;ExchangeType exchangeType = destination.exchangeType;// 原子性的预声明QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {Queue queue = newQueue(queueName); amqpAdmin.declareQueue(queue);Exchange exchange = newCustomExchange(exchangeName, exchangeType.getType); amqpAdmin.declareExchange(exchange);Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs;amqpAdmin.declareBinding(binding);returntrue; });TransactionalMessage record = newTransactionalMessage; record.setQueueName(queueName);record.setExchangeName(exchangeName);record.setExchangeType(exchangeType.getType);record.setRoutingKey(routingKey);record.setBusinessModule(message.businessModule);record.setBusinessKey(message.businessKey);String content = message.content;// 保存事务消息记录managementService.saveTransactionalMessageRecord(record, content);// 注册事务同步器TransactionSynchronizationManager.registerSynchronization( newTransactionSynchronizationAdapter { @OverridepublicvoidafterCommit{ managementService.sendMessageSync(record, content);}});}}

方案设计思路

;

@Getter@RequiredArgsConstructorpublicenumExchangeType {

事务执行的逻辑单元

这里有一点尚待优化:更新事务消息记录状态的方法可以优化为批量更新,在limit比较大的时候,批量更新的效率会更高。最后是定时任务的配置类:

/*** 这里用的是本地的Redis,实际上要做成配置*/privatefinalRedissonClient redisson = Redisson.create;

本文demo项目仓库:rabbit-transactional-message

为了把保存待发送的事务消息和发送消息到RabbitMQ两个动作从使用者感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置,主要参考AbstractPlatformTransactionManager#commit或者AbstractPlatformTransactionManager#processCommit方法:

如下图所示:

@Slf4j @RequiredArgsConstructor@ServicepublicclassMockBusinessService{

privateLong id; privateLong messageId; privateStringcontent; }

在事务方法内,保存事务消息的时候,标记消息记录推送状态为处理中。

StringexchangeName;

事务消息模块的设计仅仅是使异步消息推送这个功能实现趋向于完备,其实一个合理的异步消息交互系统,一定会提供同步查询接口,这一点是基于异步消息没有回调或者没有响应的特性导致的。

虽然之前提到笔者建议下游服务自理自身服务消费异常的场景,但是有些时候迫于无奈还是需要上游把对应的消息重新推送,这个算是特殊的场景。

@DatapublicclassTransactionalMessageContent {

privatefinal Stringtype; }

// 发送消息的目的地publicinterfaceDestination {

TransactionalMessageService的实现类是事务消息的核心功能实现,代码如下:

事务消息原则上只适合弱一致性(或者说最终一致性)的场景,常见的弱一致性场景如:

下面的方案设计描述中,暂时忽略了消息事务管理后台的API设计,这些可以在后期补充。

privatefinalTransactionalMessageDao messageDao; privatefinalTransactionalMessageContentDao contentDao; privatefinalRabbitTemplate rabbitTemplate;

@Transactional(rollbackFor = Exception.class) publicvoidsaveOrderthrowsException { String orderId = UUID.randomUUID.toString;BigDecimal amount = BigDecimal.valueOf( 100L); Map<String, Object> message = newHashMap<>; message.put( "orderId", orderId); message.put( "amount", amount); jdbcTemplate.update( "INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> { p.setString( 1, orderId); p.setBigDecimal( 2, amount); });String content = objectMapper.writeValueAsString(message);transactionalMessageService.sendTransactionalMessage(DefaultDestination.builder.exchangeName( "tm.test.exchange") .queueName( "tm.test.queue") .routingKey( "tm.test.key") .exchangeType(ExchangeType.DIRECT).build,DefaultTxMessage.builder.businessKey(orderId).businessModule( "SAVE_ORDER") .content(content).build);log.info( "保存订单:{}成功...", orderId); }}

一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。

privatefinalAmqpAdmin amqpAdmin; privatefinalTransactionalMessageManagementService managementService;

强一致性的场景一般不应该选用事务消息。

publicvoidsendMessageSync(TransactionalMessage record, String content){ try{ rabbitTemplate.convertAndSend(record.getExchangeName, record.getRoutingKey, content);if(log.isDebugEnabled) { log.debug( "发送消息成功,目标队列:{},消息内容:{}", record.getQueueName, content); }// 标记成功markSuccess(record);} catch(Exception e) { // 标记失败markFail(record, e);}}

为了降低代码的入侵性,事务消息需要借助Spring的编程式事务或者声明式事务。编程式事务一般依赖于TransactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional。

demo必须本地安装MySQL、Redis和RabbitMQ才能正常启动,本地必须新建一个数据库命名local。

JDK1.8 spring-boot-start-web:2.x.x spring-boot-start-jdbc:2.x.x spring-boot-start-amqp:2.x.x HikariCP:3.x.x(spring-boot-start-jdbc自带) mysql-connector-java:5.1.48 redisson:3.12.1 用户服务完成了注册动作,向短信服务推送一条营销相关的消息。 信贷体系中,订单服务保存订单完毕,向审批服务推送一条待审批的订单记录信息。 …… 保存消息记录。 推送消息到RabbitMQ服务端。 消息记录的查询、补偿推送等等。 展开全文

spring-boot-starter-jdbc、mysql-connector-java和spring-boot-starter-aop是MySQL事务相关,而spring-boot-starter-amqp是RabbitMQ客户端的封装,redisson主要使用其分布式锁,用于补偿定时任务的加锁执行(以防止服务多个节点并发执行补偿推送)。

publicinterfaceTransactionalMessageContentDao{

在事务方法内,保存事务消息的时候,标记消息记录推送状态为处理中。

voidinsertSelective( TransactionalMessage record) ;

publicvoidsaveTransactionalMessageRecord(TransactionalMessage record, String content){ record.setMessageStatus(TxMessageStatus.PENDING.getStatus);record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now, DEFAULT_INIT_BACKOFF,DEFAULT_BACKOFF_FACTOR, 0)); record.setCurrentRetryTimes( 0); record.setInitBackoff(DEFAULT_INIT_BACKOFF);record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);messageDao.insertSelective(record);TransactionalMessageContent messageContent = newTransactionalMessageContent; messageContent.setContent(content);messageContent.setMessageId(record.getId);contentDao.insert(messageContent);}

因为此模块有可能扩展出一个后台管理模块,所以要把消息的管理和状态相关字段和大体积的消息内容分别存放在两个表,从而避免大批量查询消息记录的时候MySQL服务IO使用率过高的问题(这是和上一个公司的DBA团队商讨后得到的一个比较合理的方案)。预留了两个业务字段business_module和business_key用于标识业务模块和业务键(一般是唯一识别号,例如订单号)。

定义模型实体类TransactionalMessage和TransactionalMessageContent:

某次测试结果如下:

TOPIC( "topic"),

另外还有一个场景需要考虑:事务提交之后触发事务同步器TransactionSynchronization的afterCommit方法失败。这是一个低概率的场景,但是在生产中一定会出现,一个比较典型的原因就是:事务提交完成后尚未来得及触发TransactionSynchronization#afterCommit方法进行推送服务实例就被重启。

StringroutingKey; }@BuilderpublicclassDefaultDestination implementsDestination { privateExchangeType exchangeType; privateStringqueueName; privateStringexchangeName; privateStringroutingKey; @OverridepublicExchangeType exchangeType { returnexchangeType; }@OverridepublicStringqueueName { returnqueueName; }@OverridepublicStringexchangeName { returnexchangeName; }@OverridepublicStringroutingKey { returnroutingKey; }}// 事务消息publicinterfaceTxMessage { StringbusinessModule; StringbusinessKey; Stringcontent; }@BuilderpublicclassDefaultTxMessage implementsTxMessage { privateStringbusinessModule; privateStringbusinessKey; privateStringcontent; @OverridepublicStringbusinessModule { returnbusinessModule; }@OverridepublicStringbusinessKey { returnbusinessKey; }@OverridepublicStringcontent { returncontent; }}// 消息状态@RequiredArgsConstructorpublicenumTxMessageStatus { /*** 成功*/SUCCESS( 1), /*** 待处理*/PENDING( 0), /*** 处理失败*/FAIL( -1), ;privatefinal Integer status; }

原标题:一个基于 RabbitMQ 的可复用的分布式事务消息架构方案!

voidupdateStatusSelective( TransactionalMessage record) ;

List<TransactionalMessage> queryPendingCompensationRecords( LocalDateTime minScheduleTime,LocalDateTime maxScheduleTime,intlimit ); }

还有一种极为特殊的情况是RabbitMQ服务端本身出现故障导致消息推送异常,这种情况下需要进行重试(补偿推送),经验证明短时间内的反复重试是没有意义的,故障的服务一般不会瞬时恢复,所以可以考虑使用指数退避算法进行重试,同时需要限制最大重试次数。

发送消息到RabbitMQ服务端这一步需要延后到事务提交之后,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。

作者:黄哥

privatestaticfinalConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = newConcurrentHashMap<>;

@Scheduled(fixedDelay = 10000) publicvoidtransactionalMessageCompensationTaskthrowsException { RLock lock = redisson.getLock( "transactionalMessageCompensationTask"); // 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制booleantryLock = lock.tryLock( 5, 300, TimeUnit.SECONDS); if(tryLock) { try{ longstart = System.currentTimeMillis; log.info( "开始执行事务消息推送补偿定时任务..."); managementService.processPendingCompensationRecords;longend = System.currentTimeMillis; longdelta = end - start; // 以防锁过早释放if(delta < 5000) { Thread.sleep( 5000- delta); }log.info( "执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start); } finally{ lock.unlock;}}}}

接着定义事务消息服务接口TransactionalMessageService:

如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:消息内容正确的前提下,消费方出现异常需要自理。

方案实施

事务消息的补偿

privatefinalTransactionalMessageManagementService managementService;

privatefinalMockBusinessService mockBusinessService;

DEFAULT( ""),

privatestaticfinalLocalDateTime END = LocalDateTime.of( 2999, 1, 1, 0, 0, 0); privatestaticfinallongDEFAULT_INIT_BACKOFF = 10L; privatestaticfinalintDEFAULT_BACKOFF_FACTOR = 2; privatestaticfinalintDEFAULT_MAX_RETRY_TIMES = 5; privatestaticfinalintLIMIT = 100;

基本代码编写完,整个项目的结构如下:

指数值、间隔值和最大重试次数上限需要根据实际情况设定,否则容易出现消息延时过大或者重试过于频繁等问题。

表设计

美国一女子退休后住在罗拉达州的一个偏远郊区,退休后没什么事做,女子就会到处去旅游,可女子旅游回家后,一进门就发现家里好像有什么不对的地方。

新京报快讯 据平安瓯海官微消息,6月6日17时50分许,温州市瓯海区公安分局接到报警,报警人称其女儿蔡某(12岁,河南人,现住瓯海区三垟街道),当日下午16时20分许与同学一起下课回家,中途和同学分开后未归。