参考了这篇 https://mp.weixin.qq.com/s/duPhdr2zMmUUCYUPATX1oQ
Flink SQL CDC On Hudi
以下是flink sql作业内容
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED ,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'server-time-zone' = 'Asia/Shanghai',
'database-name' = 'flink-test',
'table-name' = 'users'
);
-- 查询cdc,这一步能够展示正确内容
SELECT * from mysql_users;
CREATE TABLE hudi_users(
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'oss://odps-prd/rtdp/hudi/hudi_users'
);
set execution.result-mode=tableau;
set execution.checkpointing.interval=10sec;
INSERT INTO hudi_users(`id`, `name`, `birthday`, `ts`, `partition`)
SELECT `id`, `name`, `birthday`, `ts`, DATE_FORMAT(`birthday`, 'MMdd') AS
`partition` FROM mysql_users;
-- 在这一步出错,Flink Standalone Cluster闪退
select * from hudi_users;
*** 出错日志 **
2021-09-24 18:15:23,190 ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Uncaught
exception in the SplitEnumerator for Source Source:
TableSourceScan(table=[[default_catalog, default_database, mysql_users]],
fields=[id, name, birthday, ts]) -> DropUpdateBefore -> Calc(select=[CAST(id)
AS id, name, birthday, ts, CAST(DATE_FORMAT(birthday, _UTF-16LE'MMdd')) AS
partition]) -> NotNullEnforcer(fields=[id]) -> Map while starting the
SplitEnumerator.. Triggering job failover.
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@875ea31
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@6ac0d459[Terminated, pool size
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:573)
~[?:1.8.0_275]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.notifyReadyAsync(ExecutorNotifier.java:130)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.callAsync(SourceCoordinatorContext.java:233)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.start(MySqlSourceEnumerator.java:75)
~[flink-sql-connector-mysql-cdc-2.0.2.jar:2.0.2]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$0(SourceCoordinator.java:132)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:315)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]