flink 1.11.2 创建hive表的问题
大佬好,我在使用create table if not exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table bm_tsk_001 already exists异常,查看源码发现if not exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛? -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.11.2 创建hive表的问题
大佬好,我在使用create table if not exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table bm_tsk_001 already exists异常,查看源码发现if not exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛? -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.11.2运行时出错
flink on yarn 模式,任务在yarn中跑了两天后出错,错误信息如下: org.apache.flink.util.FlinkException: JobManager responsible for fa0e8f776be3b5cd6573e1922da67c1f lost the leadership. at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415) [DataHub-2.0-SNAPSHOT.jar:na] at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173) [DataHub-2.0-SNAPSHOT.jar:na] at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) [DataHub-2.0-SNAPSHOT.jar:na] at java.util.Optional.ifPresent(Optional.java:159) ~[na:1.8.0_181] at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) [DataHub-2.0-SNAPSHOT.jar:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[DataHub-2.0-SNAPSHOT.jar:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[DataHub-2.0-SNAPSHOT.jar:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[DataHub-2.0-SNAPSHOT.jar:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[DataHub-2.0-SNAPSHOT.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[DataHub-2.0-SNAPSHOT.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[DataHub-2.0-SNAPSHOT.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[DataHub-2.0-SNAPSHOT.jar:na] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[DataHub-2.0-SNAPSHOT.jar:na] Caused by: java.lang.Exception: Job leader for job id fa0e8f776be3b5cd6573e1922da67c1f lost leadership. ... 24 common frames omitted -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.11 ddl 写mysql的问题
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = 'test0717'," + " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', " + "'scan.startup.mode' = 'group-offsets','properties.group.id'='test'," + "'format' = 'debezium-json'," + "'debezium-json.schema-include'='false'," + "'debezium-json.ignore-parse-errors'='true')"; tEnv.executeSql(sourceDdl); System.out.println("init source ddl successful ==>" + sourceDdl); String sinkDdl = " CREATE TABLE sink " + "( " + "id STRING NOT NULL," + " name STRING, " + "description STRING," + " weight Double," + " PRIMARY KEY (id) NOT ENFORCED " + ")" + " WITH " + "( " + "'connector' = 'jdbc', " + "'url' = 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + "'table-name' = 'table-out', " + "'driver'= 'com.mysql.cj.jdbc.Driver'," + "'sink.buffer-flush.interval'='1s'," + "'sink.buffer-flush.max-rows'='1000'," + "'username'='DataPip', " + "'password'='DataPip')"; tEnv.executeSql(sinkDdl); System.out.println("init sink ddl successful ==>" + sinkDdl); String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source"; System.out.println("execute dml ==>" + dml); tEnv.executeSql(dml); tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE debezium_source (EXCLUDING ALL)"); tEnv.executeSql("INSERT INTO print_table SELECT id,name ,description, weight FROM debezium_source"); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 checkpoint使用
感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀! godfrey he wrote > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: > >> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> 从checkpoint恢复以后,新来op=d的数据会删除失败 >> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> 最大允许同时出现几个CheckPoint >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >> 最小得间隔时间 >> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> 是否倾向于用CheckPoint做故障恢复 >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> // >> 容忍多少次CheckPoint失败 >> //Checkpoint文件清理策略 >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> //Checkpoint外部文件路径 >> env.setStateBackend(new FsStateBackend(new >> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> String sourceDDL = String.format( >> "CREATE TABLE debezium_source (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double" + >> ") WITH (" + >> " 'connector' = 'kafka-0.11'," + >> " 'topic' = '%s'," + >> " 'properties.bootstrap.servers' = '%s'," + >> " 'scan.startup.mode' = 'group-offsets'," + >> " 'format' = 'debezium-json'" + >> ")", "ddd", " 172.22.20.206:9092"); >> String sinkDDL = "CREATE TABLE sink (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double," + >> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED >> " >> + >> ") WITH (" + >> " 'connector' = 'jdbc'," + >> " 'url' = >> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >> " 'table-name' = 'products'," + >> " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >> " 'username'='DataPip'," + >> " 'password'='DataPip'" + >> ")"; >> String dml = "INSERT INTO sink SELECT id,name ,description, >> weight >> FROM debezium_source GROUP BY id,name ,description, weight"; >> tEnv.executeSql(sourceDDL); >> tEnv.executeSql(sinkDDL); >> tEnv.executeSql(dml); >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 checkpoint使用
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式: Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240) godfrey he wrote > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: > >> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> 从checkpoint恢复以后,新来op=d的数据会删除失败 >> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> 最大允许同时出现几个CheckPoint >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >> 最小得间隔时间 >> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> 是否倾向于用CheckPoint做故障恢复 >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> // >> 容忍多少次CheckPoint失败 >> //Checkpoint文件清理策略 >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> //Checkpoint外部文件路径 >> env.setStateBackend(new FsStateBackend(new >> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> String sourceDDL = String.format( >> "CREATE TABLE debezium_source (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double" + >> ") WITH (" + >> " 'connector' = 'kafka-0.11'," + >> " 'topic' = '%s'," + >> " 'properties.bootstrap.servers' = '%s'," + >> " 'scan.startup.mode' = 'group-offsets'," + >> " 'format' = 'debezium-json'" + >> ")", "ddd", " 172.22.20.206:9092"); >> String sinkDDL = "CREATE TABLE sink (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double," + >&
Re: flink 1.11 checkpoint使用
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式: Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240) Jark wrote > Hi, > > 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗? > 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。 > 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。 > > Best, > Jark > > On Thu, 16 Jul 2020 at 21:56, godfrey he > godfreyhe@ > wrote: > >> 为什么要 GROUP BY id,name ,description, weight ? >> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM >> debezium_source" 不能满足需求? >> >> 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: >> >> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> > 从checkpoint恢复以后,新来op=d的数据会删除失败 >> > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> > >> > >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> > .useBlinkPlanner() >> > .inStreamingMode() >> > .build(); >> > >> > StreamExecutionEnvironment env = >> > StreamExecutionEnvironment.getExecutionEnvironment(); >> > >> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> > 最大允许同时出现几个CheckPoint >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); >> // >> > 最小得间隔时间 >> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> > 是否倾向于用CheckPoint做故障恢复 >> > >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> > // >> > 容忍多少次CheckPoint失败 >> > //Checkpoint文件清理策略 >> > >> > >> > >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> > //Checkpoint外部文件路径 >> > env.setStateBackend(new FsStateBackend(new >> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> > StreamTableEnvironment tEnv = >> StreamTableEnvironment.create(env, >> > settings); >> > String sourceDDL = String.format( >> > "CREATE TABLE debezium_source (" + >> > " id INT NOT NULL," + >> > " name STRING," + >> > " description STRING," + >> > " weight Double" + >> > ") WITH (" + >> > " 'connector' = 'kafka-0.11'," + >> > " 'topic' = '%s'," + >> >
flink 1.11 checkpoint使用
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml); -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink使用debezium-json format报错
log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240) -- Sent from: http://apache-flink.147419.n8.nabble.com/