I encountered the exact same issue before when experimenting in a testing
environment. I was not able to spot the bug as mentioned in this thread,
the solution I did was to downgrade my own kafka-client version from 2.5 to
2.4.1, matching the version of flink-connector-kafka.
In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch using

this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
> NO_PRODUCER_EPOCH);


instead of

> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;


On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for the update. Skimming over the code it looks indeed that we are
> overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
> not 100% how this will cause the observed problem, though. I am also not a
> Flink Kafka connector and Kafka expert so I would appreciate it if someone
> more familiar could double check this part of the code.
>
> Concerning the required changing of the UID of an operator Piotr, is this
> a known issue and documented somewhere? I find this rather surprising from
> a user's point of view.
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Forwarding 周瑞's message to a duplicate thread:
>>
>> After our analysis, we found a bug in the
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
>> method
>> The analysis process is as follows:
>>
>>
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>> public void initializeState(FunctionInitializationContext context) throws
>> Exception {
>>     state = context.getOperatorStateStore().getListState(stateDescriptor);
>>     boolean recoveredUserContext = false;
>>     if (context.isRestored()) {
>>         LOG.info("{} - restoring state", name());
>>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>>             userContext = operatorState.getContext();
>>             List<TransactionHolder<TXN>> recoveredTransactions =
>>                     operatorState.getPendingCommitTransactions();
>>             List<TXN> handledTransactions = new
>> ArrayList<>(recoveredTransactions.size() + 1);
>>             for (TransactionHolder<TXN> recoveredTransaction :
>> recoveredTransactions) {
>>                 // If this fails to succeed eventually, there is actually
>> data loss
>>                 recoverAndCommitInternal(recoveredTransaction);
>>                 handledTransactions.add(recoveredTransaction.handle);
>>                 LOG.info("{} committed recovered transaction {}", name(),
>> recoveredTransaction);
>>             }
>>
>>             {
>>                 TXN transaction =
>> operatorState.getPendingTransaction().handle;
>>                 recoverAndAbort(transaction);
>>                 handledTransactions.add(transaction);
>>                 LOG.info(
>>                         "{} aborted recovered transaction {}",
>>                         name(),
>>                         operatorState.getPendingTransaction());
>>             }
>>
>>             if (userContext.isPresent()) {
>>                 finishRecoveringContext(handledTransactions);
>>                 recoveredUserContext = true;
>>             }
>>         }
>>     }
>>
>> (1)recoverAndCommitInternal(recoveredTransaction);
>> The previous transactionalid, producerId and epoch in the state are used
>> to commit the transaction,However, we find that the producerIdAndEpoch of
>> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
>> pollutes the static constant ProducerIdAndEpoch.NONE
>>
>> @Override
>> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
>> transaction) {
>>     if (transaction.isTransactional()) {
>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>         try {
>>             producer =
>> initTransactionalProducer(transaction.transactionalId, false);
>>             producer.resumeTransaction(transaction.producerId,
>> transaction.epoch);
>>             producer.commitTransaction();
>>         } catch (InvalidTxnStateException | ProducerFencedException ex) {
>>             // That means we have committed this transaction before.
>>             LOG.warn(
>>                     "Encountered error {} while recovering transaction
>> {}. "
>>                             + "Presumably this transaction has been
>> already committed before",
>>                     ex,
>>                     transaction);
>>         } finally {
>>             if (producer != null) {
>>                 producer.close(0, TimeUnit.SECONDS);
>>             }
>>         }
>>     }
>> }
>>
>> public void resumeTransaction(long producerId, short epoch) {
>>     synchronized (producerClosingLock) {
>>         ensureNotClosed();
>>         Preconditions.checkState(
>>                 producerId >= 0 && epoch >= 0,
>>                 "Incorrect values for producerId %s and epoch %s",
>>                 producerId,
>>                 epoch);
>>         LOG.info(
>>                 "Attempting to resume transaction {} with producerId {}
>> and epoch {}",
>>                 transactionalId,
>>                 producerId,
>>                 epoch);
>>
>>         Object transactionManager = getField(kafkaProducer,
>> "transactionManager");
>>         synchronized (transactionManager) {
>>             Object topicPartitionBookkeeper =
>>                     getField(transactionManager,
>> "topicPartitionBookkeeper");
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>>             invoke(topicPartitionBookkeeper, "reset");
>>
>>             Object producerIdAndEpoch = getField(transactionManager,
>> "producerIdAndEpoch");
>>             setField(producerIdAndEpoch, "producerId", producerId);
>>             setField(producerIdAndEpoch, "epoch", epoch);
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>>             setField(transactionManager, "transactionStarted", true);
>>         }
>>     }
>> }
>>
>>
>> public TransactionManager(LogContext logContext,
>>                           String transactionalId,
>>                           int transactionTimeoutMs,
>>                           long retryBackoffMs,
>>                           ApiVersions apiVersions) {
>>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>     this.transactionalId = transactionalId;
>>     this.log = logContext.logger(TransactionManager.class);
>>     this.transactionTimeoutMs = transactionTimeoutMs;
>>     this.transactionCoordinator = null;
>>     this.consumerGroupCoordinator = null;
>>     this.newPartitionsInTransaction = new HashSet<>();
>>     this.pendingPartitionsInTransaction = new HashSet<>();
>>     this.partitionsInTransaction = new HashSet<>();
>>     this.pendingRequests = new PriorityQueue<>(10,
>> Comparator.comparingInt(o -> o.priority().priority));
>>     this.pendingTxnOffsetCommits = new HashMap<>();
>>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>>     this.partitionsToRewriteSequences = new HashSet<>();
>>     this.retryBackoffMs = retryBackoffMs;
>>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>>     this.apiVersions = apiVersions;
>> }
>>
>>
>>
>> public class ProducerIdAndEpoch {
>>     public static final ProducerIdAndEpoch NONE = new
>> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
>> RecordBatch.NO_PRODUCER_EPOCH);
>>
>>     public final long producerId;
>>     public final short epoch;
>>
>>     public ProducerIdAndEpoch(long producerId, short epoch) {
>>         this.producerId = producerId;
>>         this.epoch = epoch;
>>     }
>>
>>     public boolean isValid() {
>>         return RecordBatch.NO_PRODUCER_ID < producerId;
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>>     }
>>
>>     @Override
>>     public boolean equals(Object o) {
>>         if (this == o) return true;
>>         if (o == null || getClass() != o.getClass()) return false;
>>
>>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>>
>>         if (producerId != that.producerId) return false;
>>         return epoch == that.epoch;
>>     }
>>
>>     @Override
>>     public int hashCode() {
>>         int result = (int) (producerId ^ (producerId >>> 32));
>>         result = 31 * result + (int) epoch;
>>         return result;
>>     }
>>
>> }
>>
>> (2)In the second step,
>> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
>> initializing the transaction, producerId and epoch in the first step
>> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
>> is sent to Kafka, the values of the producerId and epoch  variables in the
>> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
>> producerId and epoch  variables in the first transaction commit, not equal
>> to - 1, - 1. So Kafka throws an exception:
>> Unexpected error in InitProducerIdResponse; Producer attempted an
>> operation with an old epoch. Either there is a newer producer with the same
>> transactionalId, or the producer's transaction has been expired by the
>> broker.
>>     at
>> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>>     at
>> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>>     at
>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>     at
>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
>> transaction) {
>>     if (transaction.isTransactional()) {
>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>         try {
>>             producer =
>> initTransactionalProducer(transaction.transactionalId, false);
>>             producer.initTransactions();
>>         } finally {
>>             if (producer != null) {
>>                 producer.close(0, TimeUnit.SECONDS);
>>             }
>>         }
>>     }
>> }
>>
>> public synchronized TransactionalRequestResult initializeTransactions() {
>>     return initializeTransactions(ProducerIdAndEpoch.NONE);
>> }
>>
>> synchronized TransactionalRequestResult
>> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>>     return handleCachedTransactionRequestResult(() -> {
>>         // If this is an epoch bump, we will transition the state as part
>> of handling the EndTxnRequest
>>         if (!isEpochBump) {
>>             transitionTo(State.INITIALIZING);
>>             log.info("Invoking InitProducerId for the first time in
>> order to acquire a producer ID");
>>         } else {
>>             log.info("Invoking InitProducerId with current producer ID
>> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>>         }
>>         InitProducerIdRequestData requestData = new
>> InitProducerIdRequestData()
>>                 .setTransactionalId(transactionalId)
>>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>>                 .setProducerId(producerIdAndEpoch.producerId)
>>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>>         InitProducerIdHandler handler = new InitProducerIdHandler(new
>> InitProducerIdRequest.Builder(requestData),
>>                 isEpochBump);
>>         enqueueRequest(handler);
>>         return handler.result;
>>     }, State.INITIALIZING);
>> }
>>
>> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I think there is no generic way. If this error has happened indeed after
>>> starting a second job from the same savepoint, or something like that, user
>>> can change Sink's operator UID.
>>>
>>> If this is an issue of intentional recovery from an earlier
>>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>>> helpful.
>>>
>>> Best, Piotrek
>>>
>>> wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a):
>>>
>>>> The error message says that we are trying to reuse a transaction id
>>>> that is
>>>> currently being used or has expired.
>>>>
>>>> I am not 100% sure how this can happen. My suspicion is that you have
>>>> resumed a job multiple times from the same savepoint. Have you checked
>>>> that
>>>> there is no other job which has been resumed from the same savepoint and
>>>> which is currently running or has run and completed checkpoints?
>>>>
>>>> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com>
>>>> how
>>>> does the transaction id generation ensures that we don't have a clash of
>>>> transaction ids if we resume the same job multiple times from the same
>>>> savepoint? From the code, I do see that we have a
>>>> TransactionalIdsGenerator
>>>> which is initialized with the taskName and the operator UID.
>>>>
>>>> fyi: @Arvid Heise <ar...@apache.org>
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>>
>>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote:
>>>>
>>>> > HI:
>>>> >       When "sink.semantic = exactly-once", the following exception is
>>>> > thrown when recovering from svaepoint
>>>> >
>>>> >        public static final String KAFKA_TABLE_FORMAT =
>>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>>> >                     ") WITH (\n" +
>>>> >                     "   'connector' = 'kafka',\n" +
>>>> >                     "   'topic' = '%s',\n" +
>>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>>> >                     "   'properties.transaction.timeout.ms' =
>>>> > '900000',\n" +
>>>> >                     "   'sink.partitioner' =
>>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>>> >                     "   'format' = 'dbz-json'\n" +
>>>> >                     ")\n";
>>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>>> default_database,
>>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>>>> (1/1
>>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>>> > operation with an old epoch. Either there is a newer producer with the
>>>> > same transactionalId, or the producer's transaction has been expired
>>>> by
>>>> > the broker.
>>>> >     at org.apache.kafka.clients.producer.internals.
>>>> >
>>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>>> > .java:1352)
>>>> >     at org.apache.kafka.clients.producer.internals.
>>>> >
>>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>>> > 1260)
>>>> >     at
>>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>>> > .java:109)
>>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>>> > NetworkClient.java:572)
>>>> >     at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>> >     at
>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>>> > .java:312)
>>>> >     at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>>> > 239)
>>>> >     at java.lang.Thread.run(Thread.java:748)
>>>> >
>>>>
>>>

Reply via email to