触发器
Jimmer支持触发器,用户可以监听数据库的变化。
触发器不仅可以告知对象的变更,也可以告知关联的变更。
触发器类型
触发器分类
-
BinLog触发器
这是默认的触发器类型,不会影响Jimmer本身生成的SQL,具有较高性能,在事务提交后被触发,能监听任何原因导致的数据库变化,包括非Jimmer API引起的数据和变化;但需要底层数据库支持binlog。
-
Transaction触发器
该触发器不无需底层数据库,在事务提交前被触发;工作机制和Alibaba Seata的AT模式类似,会在修改过程中生成额外的查询语句,对修改性能有一定影响,也只能监听因Jimmer API导致的数据库的变化。
两种触发器的区别如下
BinLog触发器 | Transaction触发器 | |
---|---|---|
触发时机 | 事务提交后 | 事务提交前 |
性能 | 高 | 低 |
可监听的数据库变化 | 任何原因导致的数据库变化 | 仅因当前应用调用Jimmer API导致的数据库变化 |
数据库 要求 | 支持且启用binlog | 无任何要求 |
工作原理 | 利用第三方技术将数据库binlog变更推送到消息队列,Jimmer应用监听消息队列 | Jimmer的任何修改操作API均自动植入额外的SQL查询以找寻数据变更,和Alibaba Seata的AT模式类似 |
除了这个表格中的区别外,两种触发器为用户提供的通知数据完全相同。
推荐用法
-
BinLog触发器
BinLog触发器在事务提交后触发,面对无法更改的既定事实。
即,BinLog触发器对原事务毫无影响,被允许做耗时操作。所以适合在其处理逻辑中执行多个任务,尤其是这些任务
- 缓存一致性维护
- 异构数据源同步
- 以异步方式向其它微服务发送消息
-
Transaction触发器
Transaction触发器在事务提交前触发,其处理逻辑会直接植入当前事务。
如果其事件处理逻辑异常,会导致当前业务修改失败;如果其处理逻辑不能很快完成,会导致当前事务长时间不释放资源。
因此,Transaction触发器适合在当前事务中追加更多的修改行为,不会破坏原子性。
适合在数据库发生变化时通过添加额外的修改来实现通用性很强的部分业务逻辑。
设置触发器类型
概 念
在讨论设置触发器类型之前,我们先看开发人员如何使用触发器
-
sqlClient.getTriggers()
或sqlClient.getTriggers(false)
: 优先返回BinLog触发器,如果没有,则返回Transaction触发器。 -
sqlClient.getTriggers(true)
: 明确返回Transaction触发器,如果没有,抛出异常。
为了影响后续用户通过sqlClient.getTriggers
能获取的触发器类型,需要在构建SqlClient时指定TriggerType。
TriggerType有三个取值
-
BINLOG_ONLY:
仅支持BinLog触发器,这是默认配置。
sqlClient.getTriggers()
和sqlClient.getTriggers(false)
返回BinLog触发器对象sqlClient.getTriggers(true)
会异常,无法返回Transaction触发器对象
-
TRANSACTION_ONLY:
仅支持Transaction触发器。 无论
sqlClient.getTriggers
的参数为何,都会返回同一个Transaction触发器对象 -
BOTH:
同时支持BinLog触发器何Transaction触发器。
sqlClient.getTriggers()
和sqlClient.getTriggers(false)
返回BinLog触发器对象sqlClient.getTriggers(true)
返回Transaction触发器对象
这里,用一张表格来对比三种情况
触发器类型 | getTriggers(false) | getTriggers(true) |
---|---|---|
BINLOG_ONLY | BinLog Triggers专用对象 | 抛出异常 |
TRANSACTION_ONLY | ||
BOTH | BinLog Triggers专用对象 | Transaction Triggers专用对象 |
Q & A
-
Q: 为什么默认
BINLOG_ONLY
模式?A: Transaction触发器会在所有保存行为中植入额外的查询用于模拟触发器,对性能有影响,默认不支持。
-
Q: 为什么
TRANSACTION_ONLY
模式下,两种触发器API共享同一个对象?A: Jimmer内置的缓存一致性策略,一定使用
sqlClient.getTriggers(false)
,开发人员无法改变。这样做的目的是,让缓存一致性维护工作不影响修改事务,在事务提交后才开始执行。因此,原始事务不会被拉长,从而快速结束释放锁资源。
但是,并非所有数据库产品都支持binlog。这时,
getTriggers(false)
返回Transaction触发器对象,冒充BinLog触发器对象,接手原本应该由BinLog触发器负责的缓存一致性维护工作。也就是说,
TRANSACTION_ONLY
是为不支持binlog的数据库设计的,这是采用该模式的唯一理由。 -
Q:
BOTH
模式下,存在两个不同的触发器API对象,是不是表示任何修改都有两次响应机会?A: 是的,而且这是一个重要的功能。
和Jimmer内置的缓存一致性机制必须由
sqlClient.getTriggers(false)
驱动不同;用户的业务代码没有这个限制,开发人员可以自由决定某个处理逻辑究竟注册到sqlClient.getTriggers(false)
还是sqlClient.getTriggers(true)
,甚至同时向二者注册。-
如果开发人员的事件响应逻辑包含一些数据联动修改,必须参与当前事务的原子性作用域,就应该选择将处理逻辑注册到
sqlClient.getTriggers(true)
-
如果开发人员的事件响应逻辑没必要参与当前事务,就应该选择将处理逻辑注册到
sqlClient.getTriggers(false)
,让当前事务尽快结束,尽快释放锁资源 -
如果开发人员的事件响应逻辑同时包含以上两种情况,就应该将处理逻辑一分为二,然后分别注册到两种触发器上。
警告如果开发人员为两种触发器注册了同一个事件响应回调,那么每次事件通知时这个回调的确会被执行两次。
这时,区分两次调用就非常重要了。回调方法的参数是一个对象,可以获取JDBC连接对象,其值是否为null可用作区分二者的标准:
- 非null,第一次回调,由Transaction Trigger引起
- null,第二次回调,由BinLog Trigger引起
-
-
Q: 对于不支持binlog的数据库而言,其缓存一致性清理是不是无法在事务提交后做?
A: 并非如此,如果开发人员愿意优化,可以做到。
诚然,这种数据库无法支持BinLog触发器,采用Transaction触发器在事务生命周期内得到数据变更通知是唯一可行的方法。
然而,不并是收到通知后,必须马上执行缓存清理工作,因为redis这种远程缓存的清理行为有网络通信成本,也有通讯失败的可能,这样做会导致本地事务被拖长甚至失 败。
Jimmer的缓存系统支持自定义CacheOperator,通过自定义CacheOperator,用户可以覆盖缓存系统的删除行为,将缓存删除任务记录下来但不马上执行,等事务提交后再真正的缓存清理。
-
不需要可靠性的做法
- 自定义CacheOperator并不立即清理缓存,而是用ThreadLocal记录要删除的缓存的键。
- 在Spring的
After commit
事件中,集中清理缓存。
-
需要可靠性的做法
- 自定义CacheOperator并不立即清理缓存,而是用同一个数据库中一个本地事件表记录要删除的缓存的键。
- 在Spring的
After Commit
事件中,从本地事件表中取数据,清理缓存,如果成功,删除本地事件表的数据。 - 用一个轮询服务,为第2步的失败兜底。
提示幸运的是,对于触发器的类型为
TRANSACTION_ONLY
这种情况,Jimmer的Spring Boot Starter已经这样实现了。请参见缓存一致性/Transaction触发器
-
使用Jimmer Spring Boot Starter
如果使用SpringBoot Starter,设置触发器类型非常简单。
在application.properties
或application.yml
中添加一个配置即可。其名称为jimmer.trigger-type
,其值为BINLOG_ONLY
| TRANSACTION_ONLY
| BOTH
。
不使用Jimmer Spring Boot Starter
- Java
- Kotlin
JSqlClient sqlClient = JSqlClient
.newBuilder()
.setTriggerType(TriggerType.BOTH)
...省略其他配置...
.build();
javax.sql.DataSource dataSource = ...;
var sqlClient = newKSqlClient {
setTriggerType(TriggerType.BOTH)
...省略其他配置...
}
BinLog触发器开发工作
和Transaction触发器不同,BinLog触发器需要采用第三方技术将数据库binlog变更推送到消息队列,并让应用监听消息队列。
因此,仅仅在构建SqlClient对 象时把TriggerType指定为BINLOG_ONLY
(默认行为)或BOTH
是不够的。
消息队列有多有选择,例如Kafka和RabbitMQ;把数据库binlog的增量推送到消息队列的第三方技术也有多种选择,例如MaxWell、Debezium、Canal和DataBus。
Jimmer对这类选择未做任何限制。但为简化问题讨论,本文假设消息队列选用Kafka,推送技术采用Maxwell (MySQL) 和Debezium (Postgres)。
由于Debezium本身是kafka-connector,所以,使用Debezium必然导致消息队列是Kafka
外部环境搭建
在开发之前,先要先安装环境,包括数据库、Kafka、以及Maxwell或Debezium。
-
Maxwell
-
进入jimmer-examples/env-with-cache/maxwell所对应的
git clone
后本地目录 -
执行
bash ./install.sh
-
-
Debezium
-
进入jimmer-examples/env-with-cache/debezium所对应的
git clone
后本地目录 -
执行
bash ./install.sh
-
监听消息队列
无论是选用不同的数据库 (MySQL或Postgres),还是选用不同变更推送技术 (Maxwell或Debezium),都会导致监听代码出现差异。
但无论如何,用户代码都是大同小异的,分为如下4个步骤
-
监听消息队列,得到消息体的字符串
-
使用ObjectMapper.readTree对消息文本进行弱类型解析
信息所谓弱类型解析,指得到的类型为JsonNode,和业务系统的类型无关
-
观察JsonNode的内容,提取出
-
表名,记为
tableName
-
修改前旧数据的的子JsonNode,记为
oldJsonNode
对于插入操作而言,
oldJsonNode
为null -
修改后新数据的的子JsonNode,记为
newJsonNode
对于删除操作而言,
newJsonNode
为null
信息数据库和变更推送技术的不同选择导致的监听代码变化,就是指这个步骤。然而,这并不难,对消息内容稍加观察即可轻松实现。
-
-
以
tableName
、oldJsonNode
和newJsonNode
调用JSqlClient.getBinLog().accept
或KSqlClient.binLog.accept
下面的例子,分别演示MySQL + Maxwell
和Postgres + Debezium
两种情况。
-
MySQL + Maxwell
对于
MySQL + Maxwell
这种情况,消息格式通常是这样的{
"database":"jimmer_demo",
"table":"book",
"type":"update",
"ts":1688592724,
"xid":11790,
"commit":true,
"data":{
"id":1,
"name":"Learning GraphQL",
"edition":1,
"price":50,
"store_id":1,
"tenant":"a",
"created_time":"2023-07-05 20:21:00",
"modified_time":"2023-07-05 20:21:00"
},
"old":{
"store_id":2
}
}稍加观察 (建议inert、update、delete的消息都看一下) 后,不难实现如下消息监听代码
- Java
- Kotlin
MaxwellListener.java@Component
public class MaxwellListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final Caches caches;
public MaxwellListener(JSqlClient sqlClient) {
this.caches = sqlClient.getCaches();
}
@KafkaListener(topics = "maxwell")
public void onMaxwellEvent(
String json,
Acknowledgment acknowledgment
) throws JsonProcessingException {
JsonNode node = MAPPER.readTree(json);
String tableName = node.get("table").asText();
String type = node.get("type").asText();
JsonNode data = node.get("data");
switch (type) {
case "insert":
binLog.accept(tableName, null, data);
break;
case "update":
binLog.accept(tableName, node.get("old"), data);
break;
case "delete":
binLog.accept(tableName, data, null);
break;
}
acknowledgment.acknowledge();
}
}MaxwellListener.kt@Component
class MaxwellListener(sqlClient: KSqlClient) {
private val caches: KCaches = sqlClient.caches
@KafkaListener(topics = ["maxwell"])
fun onMaxwellEvent(
json: String,
acknowledgment: Acknowledgment
) {
val node = MAPPER.readTree(json)
val tableName = node["table"].asText()
val type = node["type"].asText()
val data = node["data"]
when (type) {
"insert" ->
binLog.accept(tableName, null, data)
"update" ->
binLog.accept(tableName, node["old"], data)
"delete" ->
binLog.accept(tableName, data, null)
}
acknowledgment.acknowledge()
}
companion object {
@JvmStatic
private val MAPPER = ObjectMapper()
}
} -
Postgres + Debezium
对于
Postgres + Debezium
这种情况,消息格式通常是这样的{
"before": {
"id": 10,
"name": "GraphQL in Action",
"edition": 1,
"price": "H0A=",
"store_id": 1,
"tenant": "b",
"created_time": 1688590805971294,
"modified_time": 1688590805971294
},
"after": {
...略...
},
"source": {
"table": "book",
...略...
},
...略...
}我们发现了一些难点,并非所有数据都可以被Jimmer的BinLog映射机制直接识别和转化
-
BigDecimal
类型 (Postgres中为NUMERIC(M[, D])
) 的属性 (本例中为Book.price
),被显示成了Base64编码 (本例中为H0A=
)该Base64字符串是org.apache.kafka.connect.data.Decimal处理后得到的信息
-
LocalDateTime
类型 (Postgres中为TIMESTAMP) 的属性被显示为数字
信息Debezium的文档会对详细介绍它的各种connector是如何处理某些特殊数据的,比如pg-connector是如何处理decimal的。
Debezium中各种connector都会提供丰富的配置,其中有一些可以用于改变默认的数据处理方式,比如改变decimal数据的处理方式,从而避免出现类似的问题。
然而,Debezium的connector通常为所有系统提供服务,并不会刻意“讨好”某个应用,我们并不能假设其配置总能保证输出Jimmer可以直接理解的数据。
Jimmer附带的例子故意不对Debezium connector进行配制,让其输出kafka-connector特有的数据,来示范Jimmer如何解决这个问题,本文同样。
- Java
- Kotlin
DebeziumCustomizer.javapackage ...略...;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.babyfish.jimmer.sql.runtime.Customizer;
...省略其他导入语句...
@Component
public class DebeziumCustomizer implements Customizer {
private static final Schema BOOK_PRICE_SCHEMA =
// Postgres中`BOOK.PRICE`的类型是`NUMERIC(10, 2)`,精度为2
Decimal.schema(2);
@Override
public void customize(JSqlClient.Builder builder) {
builder.setBinLogPropReader( ❶
LocalDateTime.class,
(prop, jsonNode) -> {
return Instant.ofEpochMilli(
jsonNode.asLong() / 1000
).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
);
builder.setBinLogPropReader(
BookProps.PRICE, ❷
(prop, jsonNode) -> {
byte[] bytes = Base64.getDecoder().decode(jsonNode.asText());
return Decimal.toLogical(BOOK_PRICE_SCHEMA, bytes);
}
);
}
}DebeziumCustomizer.ktpackage ...略...
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.babyfish.jimmer.sql.kt.cfg.KCustomizer
...省略其他导入语句...
@Component
class DebeziumCustomizer : KCustomizer {
override fun customize(dsl: KSqlClientDsl) {
dsl.setBinLogPropReader(
LocalDateTime::class ❶
) { _, jsonNode ->
Instant.ofEpochMilli(
jsonNode.asLong() / 1000
).atZone(ZoneId.systemDefault()).toLocalDateTime()
}
dsl.setBinLogPropReader(
Book::price ❷
) { _, jsonNode ->
Decimal.toLogical(
BOOK_PRICE_SCHEMA,
Base64.getDecoder().decode(jsonNode.asText())
)
}
}
companion object {
private val BOOK_PRICE_SCHEMA =
// Postgres中`BOOK.PRICE`的类型是`NUMERIC(10, 2)`,精度为2
Decimal.schema(2)
}
}setBinLogPropReader
允许开发人员自定义如何解析消息中无法直接识别的属性,有两种用法-
❶ 给定返回类型,指定一类属性该如何解析
-
❷ 精确定义某个属性该如何解析
解决这些问题后,消息监听代码就很容易实现了
- Java
- Kotlin
DebeziumListener.java@Component
public class DebeziumListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final BinLog binLog;
public DebeziumListener(JSqlClient sqlClient) {
this.binLog = sqlClient.getBinLog();
}
@KafkaListener(topicPattern = "debezium\\..*")
public void onDebeziumEvent(
@Payload(required = false) String json,
Acknowledgment acknowledgment
) throws JsonProcessingException {
if (json != null) { // Debezium在发送数据删除消息后会紧接着发送空消息
JsonNode node = MAPPER.readTree(json);
String tableName = node.get("source").get("table").asText();
binLog.accept(
tableName,
node.get("before"),
node.get("after")
);
}
acknowledgment.acknowledge();
}
}DebeziumListener.kt@Component
class DebeziumListener(sqlClient: KSqlClient) {
private val binLog: BinLog = sqlClient.binLog
@KafkaListener(topicPattern = """debezium\..*""")
fun onDebeziumEvent(
@Payload(required = false) json: String?,
acknowledgment: Acknowledgment
) {
if (json !== null) { // Debezium在发送数据删除消息后会紧接着发送空消息
val node: JsonNode = MAPPER.readTree(json)
val tableName: String = node["source"]["table"].asText()
binLog.accept(
tableName,
node["before"],
node["after"]
)
}
acknowledgment.acknowledge()
}
companion object {
private val MAPPER = ObjectMapper()
}
} -
使用示范
如果使用BinLog触发器,请先按上文所述,启用BinLog 触发器。
注册处理逻辑
-
使用Spring Boot Starter
如果使用SpringBoot Starter,触发器事件会被作为Spring事件发送。
因此,使用注解
@org.springframework.context.event.EventListener
响应Spring事件即可- Java
- Kotlin
DatabaseListener.java@Component
public class DatabaseListener {
@EventListener
public void onEntityChanged(EntityEvent<?> e) {
if (e.getImmutableType().getJavaClass() == Book.class) {
System.out.println("The object `Book` is changed");
System.out.println("\told: " + e.getOldEntity());
System.out.println("\tnew: " + e.getNewEntity());
}
}
@EventListener
public void onAssociationChanged(AssociationEvent e) {
if (e.isChanged(BookProps.STORE)) {
System.out.println("The many-to-one association `Book.store` is changed");
System.out.println("\tbook id: " + e.getSourceId());
System.out.println("\tdetached book store id: " + e.getDetachedTargetId());
System.out.println("\tattached book store id: " + e.getAttachedTargetId());
} else if (e.isChanged(BookStoreProps.BOOKS)) {
System.out.println("The one-to-many association `BookStore.books` is changed");
System.out.println("\tbook store id: " + e.getSourceId());
System.out.println("\tdetached book id: " + e.getDetachedTargetId());
System.out.println("\tattached book id: " + e.getAttachedTargetId());
}
}
}DatabaseListener.kt@Component
public class DatabaseListener {
@EventListener
fun onEntityChanged(e: EntityEvent<*>) {
if (e.ImmutableType.javaClass == Book::class.java) {
println("The object `Book` is changed")
println("\told: ${it.oldEntity}")
println("\tnew: ${it.newEntity}")
}
}
@EventListener
fun onAssociationChanged(e: AssociationEvent) {
if (e.isChanged(Book::store)) {
println("The many-to-one association `Book.store` is changed")
println("\tbook id: ${it.sourceId}")
println("\tdetached book store id: ${it.detachedTargetId}")
println("\tattached book store id: ${it.attachedTargetId}")
} else if (e.isChanged(BookStore::books)) {
println("The one-to-many association `BookStore.books` is changed")
println("\tbook store id: ${it.sourceId}")
println("\tdetached book id: ${it.detachedTargetId}")
println("\tattached book id: ${it.attachedTargetId}")
}
}
} -
使用底层API
如果不采用Jimmer的SpringBoot starter,需要手动注册事件响应代码。
- Java
- Kotlin
sqlClient.getTriggers().addEntityListener(Book.class, e -> {
System.out.println("The object `Book` is changed");
System.out.println("\told: " + e.getOldEntity());
System.out.println("\tnew: " + e.getNewEntity());
});
sqlClient.getTriggers().addAssociationListener(BookProps.STORE, e -> {
System.out.println("The many-to-one association `Book.store` is changed");
System.out.println("\tbook id: " + e.getSourceId());
System.out.println("\tdetached book store id: " + e.getDetachedTargetId());
System.out.println("\tattached book store id: " + e.getAttachedTargetId());
});
sqlClient.getTriggers().addAssociationListener(BookStoreProps.BOOKS, e -> {
System.out.println("The one-to-many association `BookStore.books` is changed");
System.out.println("\tbook store id: " + e.getSourceId());
System.out.println("\tdetached book id: " + e.getDetachedTargetId());
System.out.println("\tattached book id: " + e.getAttachedTargetId());
});sqlClient.triggers.addEntityListener(Book::class) {
println("The object `Book` is changed")
println("\told: ${it.oldEntity}")
println("\tnew: ${it.newEntity}")
}
sqlClient.triggers.addAssociationListener(Book::store) {
println("The many-to-one association `Book.store` is changed")
println("\tbook id: ${it.sourceId}")
println("\tdetached book store id: ${it.detachedTargetId}")
println("\tattached book store id: ${it.attachedTargetId}")
}
sqlClient.triggers.addAssociationListener(BookStore::books) {
println("The one-to-many association `BookStore.books` is changed")
println("\tbook store id: ${it.sourceId}")
println("\tdetached book id: ${it.detachedTargetId}")
println("\tattached book id: ${it.attachedTargetId}")
}
其中sqlClient.getTriggers()
或sqClient.triggers
用于将处理逻辑注册到默认的触发器上。
也可把上面代码中的sqlClient.getTriggers()
或sqClient.triggers
替换为**sqlClient.getTriggers(true)
**,这样,处理逻辑就被注册到Transaction触发器上。
体验触发器
现在,我们触发事件,体验触发器。
BinLog触发器可以监听因任何原因导致的数据库变化,即便绕过应用程序用其他任何手段实现数据库修改,也可以向BinLog触发器发送事件。
比如,你可以直接用SQL工具执行
update BOOK set STORE_ID = 2 where ID = 7;
然而,如果要向Transaction触发器发送事件,则必须通过Jimmer的API修改数据库,例如
- Java
- Kotlin
BookTable book = Tables.BOOK_TABLE;
sqlClient
.createUpdate(book)
.set(book.storeId(), 2L)
.where(book.id().eq(7L))
.execute();
sqlClient
.createUpdate(Book::class) {
set(table.store.id, 2L)
where(table.id eq 7L)
}
.execute()
打印结果
The object `Book` is changed ❶
old: {"id":7,"name":"Programming TypeScript","edition":1,"price":47.50,"store":{"id":1}}
new: {"id":7,"name":"Programming TypeScript","edition":1,"price":47.50,"store":{"id":2}}
The many-to-one association `Book.store` is changed ❷
book id: 7
detached book store id: 1
attached book store id: 2
The one-to-many association `BookStore.books` is changed ❸
book store id: 1
detached book id: 7
attached book id: null
The one-to-many association `BookStore.books` is changed ❹
book store id: 2
detached book id: null
attached book id: 7
其中
-
❶ 代表了对象变更事件
-
❷、❸和❹代表了关联变更事件
Jimmer触发器不仅能把简单地把表的变更转化为对象变更事件,还可以把外键的变化和中间表的变化转化为关联变更事件。