The error message says that we are trying to reuse a transaction id that is currently being used or has expired.
I am not 100% sure how this can happen. My suspicion is that you have resumed a job multiple times from the same savepoint. Have you checked that there is no other job which has been resumed from the same savepoint and which is currently running or has run and completed checkpoints? @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how does the transaction id generation ensures that we don't have a clash of transaction ids if we resume the same job multiple times from the same savepoint? From the code, I do see that we have a TransactionalIdsGenerator which is initialized with the taskName and the operator UID. fyi: @Arvid Heise <ar...@apache.org> Cheers, Till On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote: > HI: > When "sink.semantic = exactly-once", the following exception is > thrown when recovering from svaepoint > > public static final String KAFKA_TABLE_FORMAT = > "CREATE TABLE "+TABLE_NAME+" (\n" + > " "+COLUMN_NAME+" STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'sink.semantic' = 'exactly-once',\n" + > " 'properties.transaction.timeout.ms' = > '900000',\n" + > " 'sink.partitioner' = > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + > " 'format' = 'dbz-json'\n" + > ")\n"; > [] - Source: TableSourceScan(table=[[default_catalog, default_database, > debezium_source]], fields=[data]) -> Sink: Sink > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1 > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to > FAILED with failure cause: org.apache.kafka.common.KafkaException: > Unexpected error in InitProducerIdResponse; Producer attempted an > operation with an old epoch. Either there is a newer producer with the > same transactionalId, or the producer's transaction has been expired by > the broker. > at org.apache.kafka.clients.producer.internals. > TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager > .java:1352) > at org.apache.kafka.clients.producer.internals. > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: > 1260) > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse > .java:109) > at org.apache.kafka.clients.NetworkClient.completeResponses( > NetworkClient.java:572) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) > at org.apache.kafka.clients.producer.internals.Sender > .maybeSendAndPollTransactionalRequest(Sender.java:414) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender > .java:312) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: > 239) > at java.lang.Thread.run(Thread.java:748) >