Hi Jim, I’ve got to ask more precisely in order to understand your situation better (even if your already answered already):
With the software that you use to determine you’ve got duplicate messages (i.e. a consumer): under normal conditions, without savepoint: * Do you receive/see a batch of produced record every 3/5 minutes: * In this case the consumer is in read-committed mode: and the problem is not related to what I replied before: * We would have to raise the question back to the community * Do you receive/see produced (single) records outside this 3/5 minute checkpoint interval: * In this case the consumer is in read-uncommitted mode (or the producer is not transactional for some reason) and we would expect duplicates after restoring a savepoint … sorry for being picky with this question, the precise answer makes all the difference on how to continue 😊 … let’s stay on the mailing list to give others a chance to reply Matthias From: Jim Chen <chenshuai19950...@gmail.com> Sent: Montag, 2. August 2021 10:56 To: Schwalbe Matthias <matthias.schwa...@viseca.ch> Subject: Re: Any one can help me? Flink consume Kafka duplicate message via savepoint restart Hi Schwalbe, The dulplicate message should be read committed messages. Now, i have set checkpoint interval is 3 minutes and transaction timeout is 5 minutes. I restart the job via savepoint still some message are duplicate. Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> 于2021年8月2日周一 下午2:53写道: Hi Jim, I’ve got two comments regarding your question: * The consumer that shoes duplicate messages, does it only read committed messages or also uncommitted messages? : * You can expect duplicate uncommitted messages, but not duplicate committed messages * As your checkpoint interval is 5 minutes, messages should only commit every 5 minutes (which leads to my second comment) * Your checkpoint interval is 5 minutes, i.e. your transactions will be opened at the beginning of a checkpoint interval and only committed after that interval (roughly 5 minutes). This is very close to your transaction timeout of (again) 5 minutes, therefore transactions might timeout before the producer even attempts to commit them. * You might want to set the checkpoint interval to a much lower value Get back to me for clarifications 😊 Thias From: Jim Chen <chenshuai19950...@gmail.com<mailto:chenshuai19950...@gmail.com>> Sent: Montag, 2. August 2021 08:34 To: flink_user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Any one can help me? Flink consume Kafka duplicate message via savepoint restart I restart flink job via savepoint. command as following: cancel command: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ -yid application_1625497885855_698371 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \ 59cf6ccc83aa163bd1e0cd3304dfe06a print savepoint: hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 restart command: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ -m yarn-cluster \ -yjm 4096 -ytm 4096 \ -ynm User_Click_Log_Split_All \ -yqu syh_offline \ -ys 2 \ -d \ -p 64 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 \ -n \ -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar Jim Chen <chenshuai19950...@gmail.com<mailto:chenshuai19950...@gmail.com>> 于2021年8月2日周一 下午1:51写道: Hi all, my flink job consume kafka topic A, and write to kafka topic B. When i restart my flink job via savepoint, topic B have some duplicate message. Any one can help me how to solve this problem? Thanks! My Versions: Flink 1.12.4 Kafka 2.0.1 Java 1.8 Core code: env.enableCheckpointing(300000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); tableEnv.createTemporaryView("data_table",dataDS); String sql = "select * from data_table a inner join hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id<http://a.id> = b.id<http://b.id>" Table table = tableEnv.sqlQuery(sql); DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); // Kafka producer parameter Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) .setParallelism(sinkParallelism); Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.