下游如何发现重复数据的,下游的isolation.level是不是read_committed


在 2021-08-02 18:14:27,"Jim Chen" <chenshuai19950...@gmail.com> 写道:
>Hi 刘建刚,
>我使用了stop with savepoint,但是还是发现,下游有重复数据。
>停止命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>-yid application_1625497885855_703064 \
>-p
>hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>\
>-d 55e7ebb6fa38faaba61b4b9a7cd89827
>
>重启命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>-m yarn-cluster \
>-yjm 4096 -ytm 4096 \
>-ynm User_Click_Log_Split_All \
>-yqu syh_offline \
>-ys 2 \
>-d \
>-p 64 \
>-s
>hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
>\
>-n \
>-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>
>
>刘建刚 <liujiangangp...@gmail.com> 于2021年8月2日周一 下午3:49写道:
>
>> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
>> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>>
>> Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:33写道:
>>
>> > 我是通过savepoint的方式重启的,命令如下:
>> >
>> > cancel command:
>> >
>> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
>> > -yid application_1625497885855_698371 \
>> > -s
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> > \
>> > 59cf6ccc83aa163bd1e0cd3304dfe06a
>> >
>> > print savepoint:
>> >
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >
>> >
>> > restart command:
>> >
>> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> > -m yarn-cluster \
>> > -yjm 4096 -ytm 4096 \
>> > -ynm User_Click_Log_Split_All \
>> > -yqu syh_offline \
>> > -ys 2 \
>> > -d \
>> > -p 64 \
>> > -s
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> > \
>> > -n \
>> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >
>> >
>> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >
>> > Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:01写道:
>> >
>> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
>> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>> > >
>> > > My Versions
>> > > Flink 1.12.4
>> > > Kafka 2.0.1
>> > > Java 1.8
>> > >
>> > > Core code:
>> > >
>> > > env.enableCheckpointing(300000);
>> > >
>> > >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > >
>> > >
>> >
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> > >
>> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>> > >
>> > > tableEnv.createTemporaryView("data_table",dataDS);
>> > > String sql = "select * from data_table a inner join
>> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
>> > a.id
>> > > = b.id"
>> > > Table table = tableEnv.sqlQuery(sql);
>> > > DataStream resultDS = tableEnv.toAppendStream(table,
>> Row.class).map(xx);
>> > >
>> > > // Kafka producer parameter
>> > > Properties producerProps = new Properties();
>> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > > bootstrapServers);
>> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
>> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
>> > kafkaBufferMemory);
>> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
>> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
>> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
>> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
>> > > "1");
>> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
>> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>> > >
>> > > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new
>> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
>> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
>> > >                 .setParallelism(sinkParallelism);
>> > >
>> >
>>

Reply via email to