flink 1.11.2 创建hive表的问题

2020-12-21 Thread
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 创建hive表的问题

2020-12-21 Thread
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2运行时出错

2020-11-15 Thread
flink on yarn 模式,任务在yarn中跑了两天后出错,错误信息如下:
org.apache.flink.util.FlinkException: JobManager responsible for
fa0e8f776be3b5cd6573e1922da67c1f lost the leadership.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
[DataHub-2.0-SNAPSHOT.jar:na]
at java.util.Optional.ifPresent(Optional.java:159) ~[na:1.8.0_181]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[DataHub-2.0-SNAPSHOT.jar:na]
Caused by: java.lang.Exception: Job leader for job id
fa0e8f776be3b5cd6573e1922da67c1f lost leadership.
... 24 common frames omitted





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11 ddl 写mysql的问题

2020-07-23 Thread
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下:
String sourceDdl =" CREATE TABLE debezium_source " +
"( " +
"id STRING NOT NULL, name STRING, description STRING, weight
Double" +
") " +
"WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = 'test0717'," +
" 'properties.bootstrap.servers' = ' 172.22.20.206:9092', "
+
"'scan.startup.mode' =
'group-offsets','properties.group.id'='test'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include'='false'," +
"'debezium-json.ignore-parse-errors'='true')";
tEnv.executeSql(sourceDdl);
System.out.println("init source ddl successful ==>" + sourceDdl);
String sinkDdl = " CREATE TABLE sink " +
"( " +
"id STRING NOT NULL," +
" name STRING, " +
"description STRING," +
" weight Double," +
" PRIMARY KEY (id) NOT ENFORCED " +
")" +
" WITH " +
"( " +
"'connector' = 'jdbc', " +
"'url' =
'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
"'table-name' = 'table-out', " +
"'driver'= 'com.mysql.cj.jdbc.Driver'," +
"'sink.buffer-flush.interval'='1s'," +
"'sink.buffer-flush.max-rows'='1000'," +
"'username'='DataPip', " +
"'password'='DataPip')";
tEnv.executeSql(sinkDdl);
System.out.println("init sink ddl successful ==>" + sinkDdl);

 String dml = "INSERT INTO sink SELECT  id,name ,description, 
weight FROM debezium_source";
System.out.println("execute dml  ==>" + dml);
tEnv.executeSql(dml);
tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')" +
"LIKE debezium_source (EXCLUDING ALL)");
tEnv.executeSql("INSERT INTO print_table SELECT  id,name
,description,  weight FROM debezium_source");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 checkpoint使用

2020-07-17 Thread
感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!

godfrey he wrote
> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
> 
> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
> 
>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> 最大允许同时出现几个CheckPoint
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>> 最小得间隔时间
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> 是否倾向于用CheckPoint做故障恢复
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> //
>> 容忍多少次CheckPoint失败
>> //Checkpoint文件清理策略
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> //Checkpoint外部文件路径
>> env.setStateBackend(new FsStateBackend(new
>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>> String sourceDDL = String.format(
>> "CREATE TABLE debezium_source (" +
>> " id INT NOT NULL," +
>> " name STRING," +
>> " description STRING," +
>> " weight Double" +
>> ") WITH (" +
>> " 'connector' = 'kafka-0.11'," +
>> " 'topic' = '%s'," +
>> " 'properties.bootstrap.servers' = '%s'," +
>> " 'scan.startup.mode' = 'group-offsets'," +
>> " 'format' = 'debezium-json'" +
>> ")", "ddd", " 172.22.20.206:9092");
>> String sinkDDL = "CREATE TABLE sink (" +
>> " id INT NOT NULL," +
>> " name STRING," +
>> " description STRING," +
>> " weight Double," +
>> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
>> "
>> +
>> ") WITH (" +
>> " 'connector' = 'jdbc'," +
>> " 'url' =
>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>> " 'table-name' = 'products'," +
>> " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>> " 'username'='DataPip'," +
>> " 'password'='DataPip'" +
>> ")";
>> String dml = "INSERT INTO sink SELECT  id,name ,description,
>> weight
>> FROM debezium_source GROUP BY id,name ,description, weight";
>> tEnv.executeSql(sourceDDL);
>> tEnv.executeSql(sinkDDL);
>> tEnv.executeSql(dml);
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 checkpoint使用

2020-07-17 Thread
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)


godfrey he wrote
> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
> 
> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
> 
>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> 最大允许同时出现几个CheckPoint
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>> 最小得间隔时间
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> 是否倾向于用CheckPoint做故障恢复
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> //
>> 容忍多少次CheckPoint失败
>> //Checkpoint文件清理策略
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> //Checkpoint外部文件路径
>> env.setStateBackend(new FsStateBackend(new
>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>> String sourceDDL = String.format(
>> "CREATE TABLE debezium_source (" +
>> " id INT NOT NULL," +
>> " name STRING," +
>> " description STRING," +
>> " weight Double" +
>> ") WITH (" +
>> " 'connector' = 'kafka-0.11'," +
>> " 'topic' = '%s'," +
>> " 'properties.bootstrap.servers' = '%s'," +
>> " 'scan.startup.mode' = 'group-offsets'," +
>> " 'format' = 'debezium-json'" +
>> ")", "ddd", " 172.22.20.206:9092");
>> String sinkDDL = "CREATE TABLE sink (" +
>> " id INT NOT NULL," +
>> " name STRING," +
>> " description STRING," +
>> " weight Double," +
>&

Re: flink 1.11 checkpoint使用

2020-07-17 Thread
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)






Jark wrote
> Hi,
> 
> 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
> 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
> 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。
> 
> Best,
> Jark
> 
> On Thu, 16 Jul 2020 at 21:56, godfrey he 

> godfreyhe@

>  wrote:
> 
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>>
>> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
>>
>> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> > 从checkpoint恢复以后,新来op=d的数据会删除失败
>> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>> >
>> >
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> > .useBlinkPlanner()
>> > .inStreamingMode()
>> > .build();
>> >
>> > StreamExecutionEnvironment env =
>> > StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> > 最大允许同时出现几个CheckPoint
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L);
>> //
>> > 最小得间隔时间
>> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> > 是否倾向于用CheckPoint做故障恢复
>> >
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> > //
>> > 容忍多少次CheckPoint失败
>> > //Checkpoint文件清理策略
>> >
>> >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > //Checkpoint外部文件路径
>> > env.setStateBackend(new FsStateBackend(new
>> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>> > StreamTableEnvironment tEnv =
>> StreamTableEnvironment.create(env,
>> > settings);
>> > String sourceDDL = String.format(
>> > "CREATE TABLE debezium_source (" +
>> > " id INT NOT NULL," +
>> > " name STRING," +
>> > " description STRING," +
>> > " weight Double" +
>> > ") WITH (" +
>> > " 'connector' = 'kafka-0.11'," +
>> > " 'topic' = '%s'," +
>> >  

flink 1.11 checkpoint使用

2020-07-16 Thread
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
//Checkpoint文件清理策略
   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Checkpoint外部文件路径
env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double" +
") WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'scan.startup.mode' = 'group-offsets'," +
" 'format' = 'debezium-json'" +
")", "ddd", " 172.22.20.206:9092");
String sinkDDL = "CREATE TABLE sink (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double," +
" PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
" 'table-name' = 'products'," +
" 'driver'= 'com.mysql.cj.jdbc.Driver'," +
" 'username'='DataPip'," +
" 'password'='DataPip'" +
")";
String dml = "INSERT INTO sink SELECT  id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(dml);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用debezium-json format报错

2020-07-13 Thread
log4j:WARN No appenders could be found for logger
(org.apache.flink.table.module.ModuleManager).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)



--
Sent from: http://apache-flink.147419.n8.nabble.com/