Hi, 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗? 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。
Best, Jark On Thu, 16 Jul 2020 at 21:56, godfrey he <godfre...@gmail.com> wrote: > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道: > > > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 > > 从checkpoint恢复以后,新来op=d的数据会删除失败 > > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s > > > > > hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata > > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build(); > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // > > 最大允许同时出现几个CheckPoint > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // > > 最小得间隔时间 > > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); > // > > 是否倾向于用CheckPoint做故障恢复 > > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); > > // > > 容忍多少次CheckPoint失败 > > //Checkpoint文件清理策略 > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > //Checkpoint外部文件路径 > > env.setStateBackend(new FsStateBackend(new > > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); > > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > > settings); > > String sourceDDL = String.format( > > "CREATE TABLE debezium_source (" + > > " id INT NOT NULL," + > > " name STRING," + > > " description STRING," + > > " weight Double" + > > ") WITH (" + > > " 'connector' = 'kafka-0.11'," + > > " 'topic' = '%s'," + > > " 'properties.bootstrap.servers' = '%s'," + > > " 'scan.startup.mode' = 'group-offsets'," + > > " 'format' = 'debezium-json'" + > > ")", "ddd", " 172.22.20.206:9092"); > > String sinkDDL = "CREATE TABLE sink (" + > > " id INT NOT NULL," + > > " name STRING," + > > " description STRING," + > > " weight Double," + > > " PRIMARY KEY (id,name, description,weight) NOT ENFORCED > " > > + > > ") WITH (" + > > " 'connector' = 'jdbc'," + > > " 'url' = > > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + > > " 'table-name' = 'products'," + > > " 'driver'= 'com.mysql.cj.jdbc.Driver'," + > > " 'username'='DataPip'," + > > " 'password'='DataPip'" + > > ")"; > > String dml = "INSERT INTO sink SELECT id,name ,description, > weight > > FROM debezium_source GROUP BY id,name ,description, weight"; > > tEnv.executeSql(sourceDDL); > > tEnv.executeSql(sinkDDL); > > tEnv.executeSql(dml); > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > >