跳到主要内容

触发器

Jimmer支持触发器,用户可以监听数据库的变化。

提示

触发器不仅可以告知对象的变更,也可以告知关联的变更。

触发器类型

触发器分类

  • BinLog触发器

    这是默认的触发器类型,不会影响Jimmer本身生成的SQL,具有较高性能,在事务提交后被触发,能监听任何原因导致的数据库变化,包括非Jimmer API引起的数据和变化;但需要底层数据库支持binlog。

  • Transaction触发器

    该触发器不无需底层数据库,在事务提交前被触发;工作机制和Alibaba Seata的AT模式类似,会在修改过程中生成额外的查询语句,对修改性能有一定影响,也只能监听因Jimmer API导致的数据库的变化。

两种触发器的区别如下

BinLog触发器Transaction触发器
触发时机事务提交后事务提交前
性能
可监听的数据库变化任何原因导致的数据库变化仅因当前应用调用Jimmer API导致的数据库变化
数据库要求支持且启用binlog无任何要求
工作原理利用第三方技术将数据库binlog变更推送到消息队列,Jimmer应用监听消息队列Jimmer的任何修改操作API均自动植入额外的SQL查询以找寻数据变更,和Alibaba Seata的AT模式类似

除了这个表格中的区别外,两种触发器为用户提供的通知数据完全相同。

推荐用法

  • BinLog触发器

    BinLog触发器在事务提交后触发,面对无法更改的既定事实。

    即,BinLog触发器对原事务毫无影响,被允许做耗时操作。所以适合在其处理逻辑中执行多个任务,尤其是这些任务

    • 缓存一致性维护
    • 异构数据源同步
    • 以异步方式向其它微服务发送消息
  • Transaction触发器

    Transaction触发器在事务提交前触发,其处理逻辑会直接植入当前事务。

    如果其事件处理逻辑异常,会导致当前业务修改失败;如果其处理逻辑不能很快完成,会导致当前事务长时间不释放资源。

    因此,Transaction触发器适合在当前事务中追加更多的修改行为,不会破坏原子性。

    适合在数据库发生变化时通过添加额外的修改来实现通用性很强的部分业务逻辑。

设置触发器类型

概念

在讨论设置触发器类型之前,我们先看开发人员如何使用触发器

  • sqlClient.getTriggers()sqlClient.getTriggers(false): 优先返回BinLog触发器,如果没有,则返回Transaction触发器。

  • sqlClient.getTriggers(true): 明确返回Transaction触发器,如果没有,抛出异常。

为了影响后续用户通过sqlClient.getTriggers能获取的触发器类型,需要在构建SqlClient时指定TriggerType。

TriggerType有三个取值

  • BINLOG_ONLY:

    仅支持BinLog触发器,这是默认配置。

    • sqlClient.getTriggers()sqlClient.getTriggers(false)返回BinLog触发器对象
    • sqlClient.getTriggers(true)会异常,无法返回Transaction触发器对象
  • TRANSACTION_ONLY:

    仅支持Transaction触发器。 无论sqlClient.getTriggers的参数为何,都会返回同一个Transaction触发器对象

  • BOTH:

    同时支持BinLog触发器何Transaction触发器。

    • sqlClient.getTriggers()sqlClient.getTriggers(false)返回BinLog触发器对象
    • sqlClient.getTriggers(true)返回Transaction触发器对象

这里,用一张表格来对比三种情况

触发器类型getTriggers(false)getTriggers(true)
BINLOG_ONLYBinLog Triggers专用对象抛出异常
TRANSACTION_ONLY
共享的Transaction Triggers对象
BOTHBinLog Triggers专用对象Transaction Triggers专用对象

Q & A

  • Q: 为什么默认BINLOG_ONLY模式?

    A: Transaction触发器会在所有保存行为中植入额外的查询用于模拟触发器,对性能有影响,默认不支持。

  • Q: 为什么TRANSACTION_ONLY模式下,两种触发器API共享同一个对象?

    A: Jimmer内置的缓存一致性策略,一定使用sqlClient.getTriggers(false),开发人员无法改变。

    这样做的目的是,让缓存一致性维护工作不影响修改事务,在事务提交后才开始执行。因此,原始事务不会被拉长,从而快速结束释放锁资源。

    但是,并非所有数据库产品都支持binlog。这时,getTriggers(false)返回Transaction触发器对象,冒充BinLog触发器对象,接手原本应该由BinLog触发器负责的缓存一致性维护工作。

    也就是说,TRANSACTION_ONLY是为不支持binlog的数据库设计的,这是采用该模式的唯一理由

  • Q: BOTH模式下,存在两个不同的触发器API对象,是不是表示任何修改都有两次响应机会?

    A: 是的,而且这是一个重要的功能。

    和Jimmer内置的缓存一致性机制必须由sqlClient.getTriggers(false)驱动不同;用户的业务代码没有这个限制,开发人员可以自由决定某个处理逻辑究竟注册到sqlClient.getTriggers(false)还是sqlClient.getTriggers(true),甚至同时向二者注册。

    • 如果开发人员的事件响应逻辑包含一些数据联动修改,必须参与当前事务的原子性作用域,就应该选择将处理逻辑注册到sqlClient.getTriggers(true)

    • 如果开发人员的事件响应逻辑没必要参与当前事务,就应该选择将处理逻辑注册到sqlClient.getTriggers(false),让当前事务尽快结束,尽快释放锁资源

    • 如果开发人员的事件响应逻辑同时包含以上两种情况,就应该将处理逻辑一分为二,然后分别注册到两种触发器上。

      警告

      如果开发人员为两种触发器注册了同一个事件响应回调,那么每次事件通知时这个回调的确会被执行两次。

      这时,区分两次调用就非常重要了。回调方法的参数是一个对象,可以获取JDBC连接对象,其值是否为null可用作区分二者的标准:

      • 非null,第一次回调,由Transaction Trigger引起
      • null,第二次回调,由BinLog Trigger引起
  • Q: 对于不支持binlog的数据库而言,其缓存一致性清理是不是无法在事务提交后做?

    A: 并非如此,如果开发人员愿意优化,可以做到。

    诚然,这种数据库无法支持BinLog触发器,采用Transaction触发器在事务生命周期内得到数据变更通知是唯一可行的方法。

    然而,不并是收到通知后,必须马上执行缓存清理工作,因为redis这种远程缓存的清理行为有网络通信成本,也有通讯失败的可能,这样做会导致本地事务被拖长甚至失败。

    Jimmer的缓存系统支持自定义CacheOperator,通过自定义CacheOperator,用户可以覆盖缓存系统的删除行为,将缓存删除任务记录下来但不马上执行,等事务提交后再真正的缓存清理。

    • 不需要可靠性的做法

      1. 自定义CacheOperator并不立即清理缓存,而是用ThreadLocal记录要删除的缓存的键。
      2. 在Spring的After commit事件中,集中清理缓存。
    • 需要可靠性的做法

      1. 自定义CacheOperator并不立即清理缓存,而是用同一个数据库中一个本地事件表记录要删除的缓存的键。
      2. 在Spring的After Commit事件中,从本地事件表中取数据,清理缓存,如果成功,删除本地事件表的数据。
      3. 用一个轮询服务,为第2步的失败兜底。
      提示

      幸运的是,对于触发器的类型为TRANSACTION_ONLY这种情况,Jimmer的Spring Boot Starter已经这样实现了。请参见缓存一致性/Transaction触发器

使用Jimmer Spring Boot Starter

如果使用SpringBoot Starter,设置触发器类型非常简单。

application.propertiesapplication.yml中添加一个配置即可。其名称为jimmer.trigger-type,其值为BINLOG_ONLY | TRANSACTION_ONLY | BOTH

不使用Jimmer Spring Boot Starter

JSqlClient sqlClient = JSqlClient
.newBuilder()
.setTriggerType(TriggerType.BOTH)
...省略其他配置...
.build();

BinLog触发器开发工作

和Transaction触发器不同,BinLog触发器需要采用第三方技术将数据库binlog变更推送到消息队列,并让应用监听消息队列。

因此,仅仅在构建SqlClient对象时把TriggerType指定为BINLOG_ONLY(默认行为)或BOTH是不够的。

消息队列有多有选择,例如Kafka和RabbitMQ;把数据库binlog的增量推送到消息队列的第三方技术也有多种选择,例如MaxWell、Debezium、Canal和DataBus。

Jimmer对这类选择未做任何限制。但为简化问题讨论,本文假设消息队列选用Kafka,推送技术采用Maxwell (MySQL) 和Debezium (Postgres)

警告

由于Debezium本身是kafka-connector,所以,使用Debezium必然导致消息队列是Kafka

外部环境搭建

在开发之前,先要先安装环境,包括数据库、Kafka、以及Maxwell或Debezium。

监听消息队列

无论是选用不同的数据库 (MySQL或Postgres),还是选用不同变更推送技术 (Maxwell或Debezium),都会导致监听代码出现差异。

但无论如何,用户代码都是大同小异的,分为如下4个步骤

  1. 监听消息队列,得到消息体的字符串

  2. 使用ObjectMapper.readTree对消息文本进行弱类型解析

    信息

    所谓弱类型解析,指得到的类型为JsonNode,和业务系统的类型无关

  3. 观察JsonNode的内容,提取出

    • 表名,记为tableName

    • 修改前旧数据的的子JsonNode,记为oldJsonNode

      对于插入操作而言,oldJsonNode为null

    • 修改后新数据的的子JsonNode,记为newJsonNode

      对于删除操作而言,newJsonNode为null

    信息

    数据库和变更推送技术的不同选择导致的监听代码变化,就是指这个步骤。然而,这并不难,对消息内容稍加观察即可轻松实现。

  4. tableNameoldJsonNodenewJsonNode调用JSqlClient.getBinLog().acceptKSqlClient.binLog.accept

下面的例子,分别演示MySQL + MaxwellPostgres + Debezium两种情况。

  • MySQL + Maxwell

    对于MySQL + Maxwell这种情况,消息格式通常是这样的

    {
    "database":"jimmer_demo",
    "table":"book",
    "type":"update",
    "ts":1688592724,
    "xid":11790,
    "commit":true,
    "data":{
    "id":1,
    "name":"Learning GraphQL",
    "edition":1,
    "price":50,
    "store_id":1,
    "tenant":"a",
    "created_time":"2023-07-05 20:21:00",
    "modified_time":"2023-07-05 20:21:00"
    },
    "old":{
    "store_id":2
    }
    }

    稍加观察 (建议inert、update、delete的消息都看一下) 后,不难实现如下消息监听代码

    MaxwellListener.java
    @Component
    public class MaxwellListener {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final Caches caches;

    public MaxwellListener(JSqlClient sqlClient) {
    this.caches = sqlClient.getCaches();
    }

    @KafkaListener(topics = "maxwell")
    public void onMaxwellEvent(
    String json,
    Acknowledgment acknowledgment
    ) throws JsonProcessingException {
    JsonNode node = MAPPER.readTree(json);
    String tableName = node.get("table").asText();
    String type = node.get("type").asText();
    JsonNode data = node.get("data");
    switch (type) {
    case "insert":
    binLog.accept(tableName, null, data);
    break;
    case "update":
    binLog.accept(tableName, node.get("old"), data);
    break;
    case "delete":
    binLog.accept(tableName, data, null);
    break;
    }
    acknowledgment.acknowledge();
    }
    }
  • Postgres + Debezium

    对于Postgres + Debezium这种情况,消息格式通常是这样的

    {
    "before": {
    "id": 10,
    "name": "GraphQL in Action",
    "edition": 1,
    "price": "H0A=",
    "store_id": 1,
    "tenant": "b",
    "created_time": 1688590805971294,
    "modified_time": 1688590805971294
    },
    "after": {
    ...略...
    },
    "source": {
    "table": "book",
    ...略...
    },
    ...略...
    }

    我们发现了一些难点,并非所有数据都可以被Jimmer的BinLog映射机制直接识别和转化

    • BigDecimal类型 (Postgres中为NUMERIC(M[, D])) 的属性 (本例中为Book.price),被显示成了Base64编码 (本例中为H0A=)

      该Base64字符串是org.apache.kafka.connect.data.Decimal处理后得到的信息

    • LocalDateTime类型 (Postgres中为TIMESTAMP) 的属性被显示为数字

    信息

    Debezium的文档会对详细介绍它的各种connector是如何处理某些特殊数据的,比如pg-connector是如何处理decimal的

    Debezium中各种connector都会提供丰富的配置,其中有一些可以用于改变默认的数据处理方式,比如改变decimal数据的处理方式,从而避免出现类似的问题。

    然而,Debezium的connector通常为所有系统提供服务,并不会刻意“讨好”某个应用,我们并不能假设其配置总能保证输出Jimmer可以直接理解的数据。

    Jimmer附带的例子故意不对Debezium connector进行配制,让其输出kafka-connector特有的数据,来示范Jimmer如何解决这个问题,本文同样。

    DebeziumCustomizer.java
    package ......;

    import org.apache.kafka.connect.data.Decimal;
    import org.apache.kafka.connect.data.Schema;
    import org.babyfish.jimmer.sql.runtime.Customizer;

    ...省略其他导入语句...

    @Component
    public class DebeziumCustomizer implements Customizer {

    private static final Schema BOOK_PRICE_SCHEMA =
    // Postgres中`BOOK.PRICE`的类型是`NUMERIC(10, 2)`,精度为2
    Decimal.schema(2);

    @Override
    public void customize(JSqlClient.Builder builder) {

    builder.setBinLogPropReader(
    LocalDateTime.class,
    (prop, jsonNode) -> {
    return Instant.ofEpochMilli(
    jsonNode.asLong() / 1000
    ).atZone(ZoneId.systemDefault()).toLocalDateTime();
    }
    );

    builder.setBinLogPropReader(
    BookProps.PRICE,
    (prop, jsonNode) -> {
    byte[] bytes = Base64.getDecoder().decode(jsonNode.asText());
    return Decimal.toLogical(BOOK_PRICE_SCHEMA, bytes);
    }
    );
    }
    }

    setBinLogPropReader允许开发人员自定义如何解析消息中无法直接识别的属性,有两种用法

    • ❶ 给定返回类型,指定一类属性该如何解析

    • ❷ 精确定义某个属性该如何解析

    解决这些问题后,消息监听代码就很容易实现了

    DebeziumListener.java
    @Component
    public class DebeziumListener {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final BinLog binLog;

    public DebeziumListener(JSqlClient sqlClient) {
    this.binLog = sqlClient.getBinLog();
    }

    @KafkaListener(topicPattern = "debezium\\..*")
    public void onDebeziumEvent(
    @Payload(required = false) String json,
    Acknowledgment acknowledgment
    ) throws JsonProcessingException {
    if (json != null) { // Debezium在发送数据删除消息后会紧接着发送空消息
    JsonNode node = MAPPER.readTree(json);
    String tableName = node.get("source").get("table").asText();
    binLog.accept(
    tableName,
    node.get("before"),
    node.get("after")
    );
    }
    acknowledgment.acknowledge();
    }
    }

使用示范

如果使用BinLog触发器,请先按上文所述,启用BinLog触发器。

注册处理逻辑

  • 使用Spring Boot Starter

    如果使用SpringBoot Starter,触发器事件会被作为Spring事件发送。

    因此,使用注解@org.springframework.context.event.EventListener响应Spring事件即可

    DatabaseListener.java
    @Component
    public class DatabaseListener {

    @EventListener
    public void onEntityChanged(EntityEvent<?> e) {
    if (e.getImmutableType().getJavaClass() == Book.class) {
    System.out.println("The object `Book` is changed");
    System.out.println("\told: " + e.getOldEntity());
    System.out.println("\tnew: " + e.getNewEntity());
    }
    }

    @EventListener
    public void onAssociationChanged(AssociationEvent e) {
    if (e.isChanged(BookProps.STORE)) {
    System.out.println("The many-to-one association `Book.store` is changed");
    System.out.println("\tbook id: " + e.getSourceId());
    System.out.println("\tdetached book store id: " + e.getDetachedTargetId());
    System.out.println("\tattached book store id: " + e.getAttachedTargetId());
    } else if (e.isChanged(BookStoreProps.BOOKS)) {
    System.out.println("The one-to-many association `BookStore.books` is changed");
    System.out.println("\tbook store id: " + e.getSourceId());
    System.out.println("\tdetached book id: " + e.getDetachedTargetId());
    System.out.println("\tattached book id: " + e.getAttachedTargetId());
    }
    }
    }
  • 使用底层API

    如果不采用Jimmer的SpringBoot starter,需要手动注册事件响应代码。

    sqlClient.getTriggers().addEntityListener(Book.class, e -> {
    System.out.println("The object `Book` is changed");
    System.out.println("\told: " + e.getOldEntity());
    System.out.println("\tnew: " + e.getNewEntity());
    });
    sqlClient.getTriggers().addAssociationListener(BookProps.STORE, e -> {
    System.out.println("The many-to-one association `Book.store` is changed");
    System.out.println("\tbook id: " + e.getSourceId());
    System.out.println("\tdetached book store id: " + e.getDetachedTargetId());
    System.out.println("\tattached book store id: " + e.getAttachedTargetId());
    });
    sqlClient.getTriggers().addAssociationListener(BookStoreProps.BOOKS, e -> {
    System.out.println("The one-to-many association `BookStore.books` is changed");
    System.out.println("\tbook store id: " + e.getSourceId());
    System.out.println("\tdetached book id: " + e.getDetachedTargetId());
    System.out.println("\tattached book id: " + e.getAttachedTargetId());
    });

其中sqlClient.getTriggers()sqClient.triggers用于将处理逻辑注册到默认的触发器上。

也可把上面代码中的sqlClient.getTriggers()sqClient.triggers替换为**sqlClient.getTriggers(true)**,这样,处理逻辑就被注册到Transaction触发器上。

体验触发器

现在,我们触发事件,体验触发器。

BinLog触发器可以监听因任何原因导致的数据库变化,即便绕过应用程序用其他任何手段实现数据库修改,也可以向BinLog触发器发送事件。

比如,你可以直接用SQL工具执行

update BOOK set STORE_ID = 2 where ID = 7;

然而,如果要向Transaction触发器发送事件,则必须通过Jimmer的API修改数据库,例如

BookTable book = Tables.BOOK_TABLE;
sqlClient
.createUpdate(book)
.set(book.storeId(), 2L)
.where(book.id().eq(7L))
.execute();

打印结果

The object `Book` is changed ❶
old: {"id":7,"name":"Programming TypeScript","edition":1,"price":47.50,"store":{"id":1}}
new: {"id":7,"name":"Programming TypeScript","edition":1,"price":47.50,"store":{"id":2}}
The many-to-one association `Book.store` is changed ❷
book id: 7
detached book store id: 1
attached book store id: 2
The one-to-many association `BookStore.books` is changed ❸
book store id: 1
detached book id: 7
attached book id: null
The one-to-many association `BookStore.books` is changed ❹
book store id: 2
detached book id: null
attached book id: 7

其中

  • ❶ 代表了对象变更事件

  • ❷、❸和❹代表了关联变更事件

提示

Jimmer触发器不仅能把简单地把表的变更转化为对象变更事件,还可以把外键的变化和中间表的变化转化为关联变更事件。