39、Flink 的CDC 格式:maxwell部署以及示例_flink cdc和maxwell

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,还依赖maxwell、kafka和flink的环境。

一、maxwell Format

1、maxwell介绍

Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySQL中的数据变化实时流式传输到Kafka、Kinesis和其他流式连接器中。Maxwell为变更日志提供了统一的格式模式,并支持使用JSON序列化消息。

Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE Flink SQL系统中的消息。在许多情况下,这对于利用此功能非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 审核日志
  • 数据库上的实时物化视图
  • 数据库表的临时连接更改历史等等。

Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。但是,截至Flink 1.17版本,Flink无法将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Maxwell消息。

2、binlog设置及验证

设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7

1)、配置

本示例设置的参数参考下面的配置

[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
......

log-bin=mysql-bin  # log-bin的名称,可以是任意名称
binlog-format=row  # 推荐该参数,其他的参数视情况而定,比如mixed、statement
server_id=1 # mysql集群环境中不要重复
binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • STATEMENT模式(SBR)
    每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)

  • ROW模式(RBR)
    不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。

  • MIXED模式(MBR)
    以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2)、重启mysql

保存配置后重启mysql

service mysqld restart
  • 1

3)、验证

重启后,可以通过2个简单的方法验证是否设置成功。

mysql默认的安装目录:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql    154 110 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql       1197 116 12:21 mysql-bin.index
.....
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002
在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加

以上情况满足,则说明binlog配置正常

3、部署

1)、下载

去其官网:https://maxwells-daemon.io/quickstart/下载需要的版本。
本示例使用的是:maxwell-1.29.2.tar.gz 注意其不同版本对jdk的要求,最新版本要求jdk11.

2)、解压

解压的目录/usr/local/bigdata/maxwell-1.29.2

tar -zvxf maxwell-1.29.2.tar.gz -C /usr/local/bigdata
[alanchan@server3 maxwell-1.29.2]$ ll
总用量 108
drwxr-xr-x 2 alanchan root  4096 116 05:45 bin
-rw-r--r-- 1 alanchan root 25133 124 2021 config.md
-rw-r--r-- 1 alanchan root 11970 124 2021 config.properties.example
-rw-r--r-- 1 alanchan root 10259 422 2020 kinesis-producer-library.properties.example
drwxr-xr-x 3 alanchan root 12288 127 2021 lib
-rw-r--r-- 1 alanchan root   548 422 2020 LICENSE
-rw-r--r-- 1 alanchan root   470 124 2021 log4j2.xml
-rw-r--r-- 1 alanchan root  3328 127 2021 quickstart.md
-rw-r--r-- 1 alanchan root  1429 127 2021 README.md
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3)、创建元数据库

该步骤需要创建一个mysql数据库,用以保存maxwell的元数据,至于访问这个数据库的用户名和密码则视情况而定,下面的内容是其官方上的操作,也就是创建用户、授权。

本文的示例中使用的是root用户,创建的数据库名称为maxwell。

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4)、启动方式

其提供了2种启动方式,即通过命令行参数的形式和通过配置文件的形式,下面是给出的示例

  • 命令行参数形式,输出到控制台
    不需要做任何配置即可直接使用,
maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
# user 和 password 是连接mysql元数据库的账号和密码
# host是被监控的mysql的ip
# producer是maxwell的输出类型,比如stdout、kafka等
  • 1
  • 2
  • 3
  • 4
  • 配置文件方式,输出到控制台
maxwell --config ../config.properties
# config.properties文件修改内容如下,其他的保持不变,也可以根据自己的需要修改
producer=stdout
# mysql login info
host=192.168.10.44
user=root
password=123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4、示例1:maxwell CDC 输出至控制台

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
  • 1

2)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server3 bin]$ maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
Using kafka version: 1.0.0
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653290,"xid":20392,"commit":true,"data":{"name":"alanchanchn","scores":109.0},"old":{"scores":199.0}}
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705653456,"xid":20935,"commit":true,"data":{"name":"alan1","scores":5.0}}
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653461,"xid":20951,"commit":true,"data":{"name":"alan1","scores":109.0},"old":{"scores":5.0}}
{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705653465,"xid":20967,"commit":true,"data":{"name":"alan1","scores":109.0}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5、示例2:maxwell CDC 输出至kafka

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic

[alanchan@server3 bin]$ maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
Using kafka version: 1.0.0
  • 1
  • 2
  • 3
  • 4

2)、通过命令行打开kafka消费者

kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
  • 1

3)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server1 bin]$ cd ../../kafka_2.12-3.0.0/bin/
[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705654206,"xid":22158,"commit":true,"data":{"name":"test","scores":100.0}}
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}
{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705654224,"xid":22210,"commit":true,"data":{"name":"test","scores":200.0}}
  • 1
  • 2
  • 3
  • 4
  • 5

二、Flink 与 maxwell 实践

为了使用maxwell格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQL JAR包的SQLClient都需要以下依赖项。

1、maven依赖

该依赖在flink自建工程中已经包含。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.17.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

有关如何部署 maxwell 以将变更日志同步到消息队列,请参阅上文的具体事例或想了解更多的信息参考maxwell 文档

2、Flink sql client 建表示例

maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL 库 userscoressink表中捕获更新操作的简单示例:

{
	"database": "cdctest",
	"table": "userscoressink",
	"type": "update",
	"ts": 1705654220,
	"xid": 22196,
	"commit": true,
	"data": {
		"name": "test",
		"scores": 200.0
	},
	"old": {
		"scores": 100.0
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一个更新事件,表示数据上scores字段值从100变更成为200。

消息已经同步到了一个 Kafka 主题:alan_maxwell_to_kafka_topic,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

具体启动maxwell参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示maxwell环境都正常后,在Flink SQL client中的操作。

-- 元数据与 MySQL "userscoressink" 表完全相同

CREATE TABLE userscoressink (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'alan_maxwell_to_kafka_topic',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'maxwell-json' -- 使用 maxwell-json 格式
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

将 Kafka 主题注册成 Flink 表之后,就可以将 maxwell消息用作变更日志源。

-- 验证,在mysql中新增、修改和删除数据,观察flink sql client 的数据变化
Flink SQL> show tables;
Empty set

Flink SQL> CREATE TABLE userscoressink (
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alan_maxwell_to_kafka_topic',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'maxwell-json' -- 使用 maxwell-json 格式
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink;
+----+--------------------------------+--------------------------------+
| op |                           name |                         scores |
+----+--------------------------------+--------------------------------+
| +I |                           test |                          100.0 |
| -U |                           test |                          100.0 |
| +U |                           test |                          200.0 |
| -D |                           test |                          200.0 |
Query terminated, received a total of 4 rows

-- 关于MySQL "userscoressink" 表的实时物化视图
-- 按name分组,对scores进行求和

Flink SQL> select name ,sum(scores) sum_scores from userscoressink group by name;
+----+--------------------------------+--------------------------------+
| op |                           name |                     sum_scores |
+----+--------------------------------+--------------------------------+
| +I |                           test |                          100.0 |
| -D |                           test |                          100.0 |
| +I |                           test |                          200.0 |
| -D |                           test |                          200.0 |
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3、Available Metadata

以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。

只有当相应的连接器转发格式元数据时,格式元数据字段才可用。

截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。
在这里插入图片描述

以下示例显示了如何访问Kafka中的Maxwell元数据字段:

CREATE TABLE userscoressink2(
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'alan_maxwell_to_kafka_topic',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'maxwell-json' 
);

# 操作步骤如下
Flink SQL> CREATE TABLE userscoressink2(
>   origin_database STRING METADATA FROM 'value.database' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.table' VIRTUAL,
>   origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alan_maxwell_to_kafka_topic',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'maxwell-json' 
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink2;
+----+----------------+---------------+---------------------------+-------------------------+-----+-------+
| op |origin_database |  origin_table |origin_primary_key_columns |               origin_ts |name |scores |
+----+----------------+---------------+---------------------------+-------------------------+-----+-------+
| +I |        cdctest |userscoressink |                    <NULL> | 2024-01-19 16:50:06.000 |test | 100.0 |
| -U |        cdctest |userscoressink |                    <NULL> | 2024-01-19 16:50:20.000 |test | 100.0 |
| +U |        cdctest |userscoressink |                    <NULL> | 2024-01-19 16:50:20.000 |test | 200.0 |
| -D |        cdctest |userscoressink |                    <NULL> | 2024-01-19 16:50:24.000 |test | 200.0 |
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

4、Format 参数

在这里插入图片描述

5、重要事项:重复的变更事件

Maxwell应用程序允许每次更改事件只投递一次。在这种情况下,Flink在消费Maxwell生产的事件时效果非常好。如果Maxwell应用程序至少在一次交付中工作,它可能会向Kafka交付重复的更改事件,Flink将获得重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置表.exec.source.cdc-events-duplicate设置为true,并在源上定义PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来消除更改事件的重复,并生成一个规范化的更改日志流。

6、数据类型映射

目前,maxwell Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档

以上,本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。