Hi Jim,
I’ve got to ask more precisely in order to understand your situation better
(even if your already answered already):
With the software that you use to determine you’ve got duplicate messages (i.e.
a consumer): under normal conditions, without savepoint:
* Do you receive/see a batch of produced record every 3/5 minutes:
* In this case the consumer is in read-committed mode: and the problem
is not related to what I replied before:
* We would have to raise the question back to the community
* Do you receive/see produced (single) records outside this 3/5 minute
checkpoint interval:
* In this case the consumer is in read-uncommitted mode (or the producer
is not transactional for some reason) and we would expect duplicates after
restoring a savepoint
… sorry for being picky with this question, the precise answer makes all the
difference on how to continue 😊
… let’s stay on the mailing list to give others a chance to reply
Matthias
From: Jim Chen <[email protected]>
Sent: Montag, 2. August 2021 10:56
To: Schwalbe Matthias <[email protected]>
Subject: Re: Any one can help me? Flink consume Kafka duplicate message via
savepoint restart
Hi Schwalbe,
The dulplicate message should be read committed messages. Now, i have set
checkpoint interval is 3 minutes and transaction timeout is 5 minutes.
I restart the job via savepoint still some message are duplicate.
Schwalbe Matthias
<[email protected]<mailto:[email protected]>> 于2021年8月2日周一
下午2:53写道:
Hi Jim,
I’ve got two comments regarding your question:
* The consumer that shoes duplicate messages, does it only read committed
messages or also uncommitted messages? :
* You can expect duplicate uncommitted messages, but not duplicate
committed messages
* As your checkpoint interval is 5 minutes, messages should only commit
every 5 minutes (which leads to my second comment)
* Your checkpoint interval is 5 minutes, i.e. your transactions will be
opened at the beginning of a checkpoint interval and only committed after that
interval (roughly 5 minutes). This is very close to your transaction timeout of
(again) 5 minutes, therefore transactions might timeout before the producer
even attempts to commit them.
* You might want to set the checkpoint interval to a much lower value
Get back to me for clarifications 😊
Thias
From: Jim Chen <[email protected]<mailto:[email protected]>>
Sent: Montag, 2. August 2021 08:34
To: flink_user <[email protected]<mailto:[email protected]>>
Subject: Re: Any one can help me? Flink consume Kafka duplicate message via
savepoint restart
I restart flink job via savepoint. command as following:
cancel command:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \
59cf6ccc83aa163bd1e0cd3304dfe06a
print savepoint:
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
restart command:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
Jim Chen <[email protected]<mailto:[email protected]>>
于2021年8月2日周一 下午1:51写道:
Hi all, my flink job consume kafka topic A, and write to kafka topic B. When i
restart my flink job via savepoint, topic B have some duplicate message. Any
one can help me how to solve this problem? Thanks!
My Versions:
Flink 1.12.4
Kafka 2.0.1
Java 1.8
Core code:
env.enableCheckpointing(300000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join
hive_catalog.dim.dim.project for system_time as of a.proctime as b on
a.id<http://a.id> = b.id<http://b.id>"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
.setParallelism(sinkParallelism);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.