Re: mysql cdc入hudi湖出错

2021-09-26 文章 Leonard Xu
Hi, chan

完整的日志能贴下吗?这个日志还看不出来。



> 在 2021年9月24日,18:23,casel.chen  写道:
> 
> SELECT `id`, `name`, `birthday`, `ts`, DATE_FORMAT(`birthday`, 'MMdd') AS 
> `partition` FROM mysql_users;



mysql cdc入hudi湖出错

2021-09-24 文章 casel.chen
参考了这篇 https://mp.weixin.qq.com/s/duPhdr2zMmUUCYUPATX1oQ

Flink SQL CDC On Hudi




以下是flink sql作业内容




CREATE  TABLE mysql_users (

id BIGINT PRIMARY KEY NOT ENFORCED ,

name STRING,

birthday TIMESTAMP(3),

ts TIMESTAMP(3)

) WITH (

'connector' = 'mysql-cdc',

'hostname' = 'localhost',

'port' = '3306',

'username' = 'root',

'password' = 'root',

'server-time-zone' = 'Asia/Shanghai',

'database-name' = 'flink-test',

'table-name' = 'users'

);




-- 查询cdc,这一步能够展示正确内容

SELECT * from mysql_users;




CREATE TABLE hudi_users(

id BIGINT PRIMARY KEY NOT ENFORCED,

name STRING,

birthday TIMESTAMP(3),

ts TIMESTAMP(3),

`partition` VARCHAR(20)

) PARTITIONED BY (`partition`) WITH (

   'connector' = 'hudi',

   'table.type' = 'MERGE_ON_READ',

   'path' = 'oss://odps-prd/rtdp/hudi/hudi_users'

);




set execution.result-mode=tableau;




set execution.checkpointing.interval=10sec;




INSERT INTO hudi_users(`id`, `name`, `birthday`, `ts`, `partition`)

SELECT `id`, `name`, `birthday`, `ts`, DATE_FORMAT(`birthday`, 'MMdd') AS 
`partition` FROM mysql_users;

-- 在这一步出错,Flink Standalone Cluster闪退



select * from hudi_users;





*** 出错日志 **

2021-09-24 18:15:23,190 ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Uncaught 
exception in the SplitEnumerator for Source Source: 
TableSourceScan(table=[[default_catalog, default_database, mysql_users]], 
fields=[id, name, birthday, ts]) -> DropUpdateBefore -> Calc(select=[CAST(id) 
AS id, name, birthday, ts, CAST(DATE_FORMAT(birthday, _UTF-16LE'MMdd')) AS 
partition]) -> NotNullEnforcer(fields=[id]) -> Map while starting the 
SplitEnumerator.. Triggering job failover.

java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@875ea31 
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@6ac0d459[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]

  at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 ~[?:1.8.0_275]

  at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) 
~[?:1.8.0_275]

  at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
 ~[?:1.8.0_275]

  at 
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:573)
 ~[?:1.8.0_275]

  at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.notifyReadyAsync(ExecutorNotifier.java:130)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]

  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.callAsync(SourceCoordinatorContext.java:233)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]

  at 
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.start(MySqlSourceEnumerator.java:75)
 ~[flink-sql-connector-mysql-cdc-2.0.2.jar:2.0.2]

  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$0(SourceCoordinator.java:132)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]

  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:315)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]

  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_275]

  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_275]

  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]