The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise <ar...@apache.org>

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote:

> HI:
>       When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>        public static final String KAFKA_TABLE_FORMAT =
>             "CREATE TABLE "+TABLE_NAME+" (\n" +
>                     "  "+COLUMN_NAME+" STRING\n" +
>                     ") WITH (\n" +
>                     "   'connector' = 'kafka',\n" +
>                     "   'topic' = '%s',\n" +
>                     "   'properties.bootstrap.servers' = '%s',\n" +
>                     "   'sink.semantic' = 'exactly-once',\n" +
>                     "   'properties.transaction.timeout.ms' =
> '900000',\n" +
>                     "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>                     "   'format' = 'dbz-json'\n" +
>                     ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
>     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
>     at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
>     at java.lang.Thread.run(Thread.java:748)
>

Reply via email to