本文共 4602 字,大约阅读时间需要 15 分钟。
在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:
create database if not exists test;use test;
drop table if exists stu;create table stu ( id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("张三", 18);insert into stu(name, age) values("李四", 20);insert into stu(name, age) values("王五", 21);
注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。
为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。
sudo vim /etc/my.cnf
server-id = 1log-bin=mysql-binbinlog_format=rowbinlog-do-db=test
注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。
sudo systemctl restart mysqld
本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。
com.ververica flink-connector-mysql-cdc 2.4.0
org.apache.flink flink-table-api-java-bridge ${flink.version}
import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); MySqlSourcemySqlSource = MySqlSource.builder() .hostname("node4") .port(3306) .username("root") .password("000000") .databaseList("test") .tableList("test.stu") .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source") .setParallelism(1); dataStreamSource.print(); env.execute(); }}
注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。
执行以下SQL语句:
mysql> insert into stu(name, age) values("赵六", 23);
输出示例:
{ "before": null, "after": { "id": 4, "name": "赵六", "age": 23 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719831654000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2300, "row": 0, "thread": 13, "query": null }, "op": "c", "ts_ms": 1719831654692, "transaction": null}
执行以下SQL语句:
mysql> update stu set name="zl", age=19 where name="赵六";
输出示例:
{ "before": { "id": 4, "name": "赵六", "age": 23 }, "after": { "id": 4, "name": "zl", "age": 19 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719831987000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2604, "row": 0, "thread": 13, "query": null }, "op": "u", "ts_ms": 1719831987238, "transaction": null}
执行以下SQL语句:
mysql> delete from stu where id=4;
输出示例:
{ "before": { "id": 4, "name": "zl", "age": 19 }, "after": null, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719832151000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2913, "row": 0, "thread": 13, "query": null }, "op": "d", "ts_ms": 1719832151198, "transaction": null}
注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。
转载地址:http://iaqbz.baihongyu.com/