Hi Jim,

I’ve got to ask more precisely in order to understand your situation better 
(even if your already answered already):

With the software that you use to determine you’ve got duplicate messages (i.e. 
a consumer): under normal conditions, without savepoint:

  *   Do you receive/see a batch of produced record every 3/5 minutes:
     *   In this case the consumer is in read-committed mode: and the problem 
is not related to what I replied before:
     *   We would have to raise the question back to the community
  *   Do you receive/see produced (single) records outside this 3/5 minute 
checkpoint interval:
     *   In this case the consumer is in read-uncommitted mode (or the producer 
is not transactional for some reason) and we would expect duplicates after 
restoring a savepoint

… sorry for being picky with this question, the precise answer makes all the 
difference on how to continue 😊

… let’s stay on the mailing list to give others a chance to reply

Matthias


From: Jim Chen <chenshuai19950...@gmail.com>
Sent: Montag, 2. August 2021 10:56
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Subject: Re: Any one can help me? Flink consume Kafka duplicate message via 
savepoint restart

Hi Schwalbe,

The dulplicate message should be read committed messages. Now, i have set 
checkpoint interval is 3 minutes and transaction timeout is 5 minutes.
I restart the job via savepoint still some message are duplicate.

Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> 于2021年8月2日周一 
下午2:53写道:
Hi Jim,

I’ve got two comments regarding your question:

  *   The consumer that shoes duplicate messages, does it only read committed 
messages or also uncommitted messages? :

     *   You can expect duplicate uncommitted messages, but not duplicate 
committed messages
     *   As your checkpoint interval is 5 minutes, messages should only commit 
every 5 minutes (which leads to my second comment)

  *   Your checkpoint interval is 5 minutes, i.e. your transactions will be 
opened at the beginning of a checkpoint interval and only committed after that 
interval (roughly 5 minutes). This is very close to your transaction timeout of 
(again) 5 minutes, therefore transactions might timeout before the producer 
even attempts to commit them.

     *   You might want to set the checkpoint interval to a much lower value

Get back to me for clarifications 😊

Thias




From: Jim Chen <chenshuai19950...@gmail.com<mailto:chenshuai19950...@gmail.com>>
Sent: Montag, 2. August 2021 08:34
To: flink_user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Any one can help me? Flink consume Kafka duplicate message via 
savepoint restart

I restart flink job via savepoint. command as following:

cancel command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s 
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \
59cf6ccc83aa163bd1e0cd3304dfe06a

print savepoint:

hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494


restart command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s 
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
 \
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar

Jim Chen <chenshuai19950...@gmail.com<mailto:chenshuai19950...@gmail.com>> 
于2021年8月2日周一 下午1:51写道:
Hi all, my flink job consume kafka topic A, and write to kafka topic B. When i 
restart my flink job via savepoint, topic B have some duplicate message. Any 
one can help me how to solve this problem? Thanks!

My Versions:
Flink 1.12.4
Kafka 2.0.1
Java 1.8

Core code:
env.enableCheckpointing(300000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join 
hive_catalog.dim.dim.project for system_time as of a.proctime as b on 
a.id<http://a.id> = b.id<http://b.id>"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new 
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
                .setParallelism(sinkParallelism);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to