Hi,

能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。

Best,
Jark

On Thu, 16 Jul 2020 at 21:56, godfrey he <godfre...@gmail.com> wrote:

> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
>
> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> > 从checkpoint恢复以后,新来op=d的数据会删除失败
> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
> >
> >
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >                 .useBlinkPlanner()
> >                 .inStreamingMode()
> >                 .build();
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> >         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> >         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> > 最大允许同时出现几个CheckPoint
> >         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> > 最小得间隔时间
> >         env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
> //
> > 是否倾向于用CheckPoint做故障恢复
> >         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> > //
> > 容忍多少次CheckPoint失败
> >         //Checkpoint文件清理策略
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >         //Checkpoint外部文件路径
> >         env.setStateBackend(new FsStateBackend(new
> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> >         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> > settings);
> >         String sourceDDL = String.format(
> >                 "CREATE TABLE debezium_source (" +
> >                         " id INT NOT NULL," +
> >                         " name STRING," +
> >                         " description STRING," +
> >                         " weight Double" +
> >                         ") WITH (" +
> >                         " 'connector' = 'kafka-0.11'," +
> >                         " 'topic' = '%s'," +
> >                         " 'properties.bootstrap.servers' = '%s'," +
> >                         " 'scan.startup.mode' = 'group-offsets'," +
> >                         " 'format' = 'debezium-json'" +
> >                         ")", "ddd", " 172.22.20.206:9092");
> >         String sinkDDL = "CREATE TABLE sink (" +
> >                 " id INT NOT NULL," +
> >                 " name STRING," +
> >                 " description STRING," +
> >                 " weight Double," +
> >                 " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
> "
> > +
> >                 ") WITH (" +
> >                 " 'connector' = 'jdbc'," +
> >                 " 'url' =
> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> >                 " 'table-name' = 'products'," +
> >                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> >                 " 'username'='DataPip'," +
> >                 " 'password'='DataPip'" +
> >                 ")";
> >         String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight
> > FROM debezium_source GROUP BY id,name ,description, weight";
> >         tEnv.executeSql(sourceDDL);
> >         tEnv.executeSql(sinkDDL);
> >         tEnv.executeSql(dml);
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

回复