I encountered the exact same issue before when experimenting in a testing environment. I was not able to spot the bug as mentioned in this thread, the solution I did was to downgrade my own kafka-client version from 2.5 to 2.4.1, matching the version of flink-connector-kafka. In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch using
this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, > NO_PRODUCER_EPOCH); instead of > this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann <trohrm...@apache.org> wrote: > Thanks for the update. Skimming over the code it looks indeed that we are > overwriting the values of the static value ProducerIdAndEpoch.NONE. I am > not 100% how this will cause the observed problem, though. I am also not a > Flink Kafka connector and Kafka expert so I would appreciate it if someone > more familiar could double check this part of the code. > > Concerning the required changing of the UID of an operator Piotr, is this > a known issue and documented somewhere? I find this rather surprising from > a user's point of view. > > Cheers, > Till > > On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <trohrm...@apache.org> wrote: > >> Forwarding 周瑞's message to a duplicate thread: >> >> After our analysis, we found a bug in the >> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction >> method >> The analysis process is as follows: >> >> >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction >> public void initializeState(FunctionInitializationContext context) throws >> Exception { >> state = context.getOperatorStateStore().getListState(stateDescriptor); >> boolean recoveredUserContext = false; >> if (context.isRestored()) { >> LOG.info("{} - restoring state", name()); >> for (State<TXN, CONTEXT> operatorState : state.get()) { >> userContext = operatorState.getContext(); >> List<TransactionHolder<TXN>> recoveredTransactions = >> operatorState.getPendingCommitTransactions(); >> List<TXN> handledTransactions = new >> ArrayList<>(recoveredTransactions.size() + 1); >> for (TransactionHolder<TXN> recoveredTransaction : >> recoveredTransactions) { >> // If this fails to succeed eventually, there is actually >> data loss >> recoverAndCommitInternal(recoveredTransaction); >> handledTransactions.add(recoveredTransaction.handle); >> LOG.info("{} committed recovered transaction {}", name(), >> recoveredTransaction); >> } >> >> { >> TXN transaction = >> operatorState.getPendingTransaction().handle; >> recoverAndAbort(transaction); >> handledTransactions.add(transaction); >> LOG.info( >> "{} aborted recovered transaction {}", >> name(), >> operatorState.getPendingTransaction()); >> } >> >> if (userContext.isPresent()) { >> finishRecoveringContext(handledTransactions); >> recoveredUserContext = true; >> } >> } >> } >> >> (1)recoverAndCommitInternal(recoveredTransaction); >> The previous transactionalid, producerId and epoch in the state are used >> to commit the transaction,However, we find that the producerIdAndEpoch of >> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which >> pollutes the static constant ProducerIdAndEpoch.NONE >> >> @Override >> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState >> transaction) { >> if (transaction.isTransactional()) { >> FlinkKafkaInternalProducer<byte[], byte[]> producer = null; >> try { >> producer = >> initTransactionalProducer(transaction.transactionalId, false); >> producer.resumeTransaction(transaction.producerId, >> transaction.epoch); >> producer.commitTransaction(); >> } catch (InvalidTxnStateException | ProducerFencedException ex) { >> // That means we have committed this transaction before. >> LOG.warn( >> "Encountered error {} while recovering transaction >> {}. " >> + "Presumably this transaction has been >> already committed before", >> ex, >> transaction); >> } finally { >> if (producer != null) { >> producer.close(0, TimeUnit.SECONDS); >> } >> } >> } >> } >> >> public void resumeTransaction(long producerId, short epoch) { >> synchronized (producerClosingLock) { >> ensureNotClosed(); >> Preconditions.checkState( >> producerId >= 0 && epoch >= 0, >> "Incorrect values for producerId %s and epoch %s", >> producerId, >> epoch); >> LOG.info( >> "Attempting to resume transaction {} with producerId {} >> and epoch {}", >> transactionalId, >> producerId, >> epoch); >> >> Object transactionManager = getField(kafkaProducer, >> "transactionManager"); >> synchronized (transactionManager) { >> Object topicPartitionBookkeeper = >> getField(transactionManager, >> "topicPartitionBookkeeper"); >> >> invoke( >> transactionManager, >> "transitionTo", >> getEnum( >> >> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); >> invoke(topicPartitionBookkeeper, "reset"); >> >> Object producerIdAndEpoch = getField(transactionManager, >> "producerIdAndEpoch"); >> setField(producerIdAndEpoch, "producerId", producerId); >> setField(producerIdAndEpoch, "epoch", epoch); >> >> invoke( >> transactionManager, >> "transitionTo", >> getEnum( >> >> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); >> >> invoke( >> transactionManager, >> "transitionTo", >> getEnum( >> >> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); >> setField(transactionManager, "transactionStarted", true); >> } >> } >> } >> >> >> public TransactionManager(LogContext logContext, >> String transactionalId, >> int transactionTimeoutMs, >> long retryBackoffMs, >> ApiVersions apiVersions) { >> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; >> this.transactionalId = transactionalId; >> this.log = logContext.logger(TransactionManager.class); >> this.transactionTimeoutMs = transactionTimeoutMs; >> this.transactionCoordinator = null; >> this.consumerGroupCoordinator = null; >> this.newPartitionsInTransaction = new HashSet<>(); >> this.pendingPartitionsInTransaction = new HashSet<>(); >> this.partitionsInTransaction = new HashSet<>(); >> this.pendingRequests = new PriorityQueue<>(10, >> Comparator.comparingInt(o -> o.priority().priority)); >> this.pendingTxnOffsetCommits = new HashMap<>(); >> this.partitionsWithUnresolvedSequences = new HashMap<>(); >> this.partitionsToRewriteSequences = new HashSet<>(); >> this.retryBackoffMs = retryBackoffMs; >> this.topicPartitionBookkeeper = new TopicPartitionBookkeeper(); >> this.apiVersions = apiVersions; >> } >> >> >> >> public class ProducerIdAndEpoch { >> public static final ProducerIdAndEpoch NONE = new >> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID, >> RecordBatch.NO_PRODUCER_EPOCH); >> >> public final long producerId; >> public final short epoch; >> >> public ProducerIdAndEpoch(long producerId, short epoch) { >> this.producerId = producerId; >> this.epoch = epoch; >> } >> >> public boolean isValid() { >> return RecordBatch.NO_PRODUCER_ID < producerId; >> } >> >> @Override >> public String toString() { >> return "(producerId=" + producerId + ", epoch=" + epoch + ")"; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (o == null || getClass() != o.getClass()) return false; >> >> ProducerIdAndEpoch that = (ProducerIdAndEpoch) o; >> >> if (producerId != that.producerId) return false; >> return epoch == that.epoch; >> } >> >> @Override >> public int hashCode() { >> int result = (int) (producerId ^ (producerId >>> 32)); >> result = 31 * result + (int) epoch; >> return result; >> } >> >> } >> >> (2)In the second step, >> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when >> initializing the transaction, producerId and epoch in the first step >> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request >> is sent to Kafka, the values of the producerId and epoch variables in the >> request parameter ProducerIdAndEpoch.NONE are equal to the values of the >> producerId and epoch variables in the first transaction commit, not equal >> to - 1, - 1. So Kafka throws an exception: >> Unexpected error in InitProducerIdResponse; Producer attempted an >> operation with an old epoch. Either there is a newer producer with the same >> transactionalId, or the producer's transaction has been expired by the >> broker. >> at >> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) >> at >> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) >> at >> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >> at >> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) >> at >> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) >> at >> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) >> at java.lang.Thread.run(Thread.java:748) >> >> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState >> transaction) { >> if (transaction.isTransactional()) { >> FlinkKafkaInternalProducer<byte[], byte[]> producer = null; >> try { >> producer = >> initTransactionalProducer(transaction.transactionalId, false); >> producer.initTransactions(); >> } finally { >> if (producer != null) { >> producer.close(0, TimeUnit.SECONDS); >> } >> } >> } >> } >> >> public synchronized TransactionalRequestResult initializeTransactions() { >> return initializeTransactions(ProducerIdAndEpoch.NONE); >> } >> >> synchronized TransactionalRequestResult >> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) { >> boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE; >> return handleCachedTransactionRequestResult(() -> { >> // If this is an epoch bump, we will transition the state as part >> of handling the EndTxnRequest >> if (!isEpochBump) { >> transitionTo(State.INITIALIZING); >> log.info("Invoking InitProducerId for the first time in >> order to acquire a producer ID"); >> } else { >> log.info("Invoking InitProducerId with current producer ID >> and epoch {} in order to bump the epoch", producerIdAndEpoch); >> } >> InitProducerIdRequestData requestData = new >> InitProducerIdRequestData() >> .setTransactionalId(transactionalId) >> .setTransactionTimeoutMs(transactionTimeoutMs) >> .setProducerId(producerIdAndEpoch.producerId) >> .setProducerEpoch(producerIdAndEpoch.epoch); >> InitProducerIdHandler handler = new InitProducerIdHandler(new >> InitProducerIdRequest.Builder(requestData), >> isEpochBump); >> enqueueRequest(handler); >> return handler.result; >> }, State.INITIALIZING); >> } >> >> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> I think there is no generic way. If this error has happened indeed after >>> starting a second job from the same savepoint, or something like that, user >>> can change Sink's operator UID. >>> >>> If this is an issue of intentional recovery from an earlier >>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be >>> helpful. >>> >>> Best, Piotrek >>> >>> wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a): >>> >>>> The error message says that we are trying to reuse a transaction id >>>> that is >>>> currently being used or has expired. >>>> >>>> I am not 100% sure how this can happen. My suspicion is that you have >>>> resumed a job multiple times from the same savepoint. Have you checked >>>> that >>>> there is no other job which has been resumed from the same savepoint and >>>> which is currently running or has run and completed checkpoints? >>>> >>>> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> >>>> how >>>> does the transaction id generation ensures that we don't have a clash of >>>> transaction ids if we resume the same job multiple times from the same >>>> savepoint? From the code, I do see that we have a >>>> TransactionalIdsGenerator >>>> which is initialized with the taskName and the operator UID. >>>> >>>> fyi: @Arvid Heise <ar...@apache.org> >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote: >>>> >>>> > HI: >>>> > When "sink.semantic = exactly-once", the following exception is >>>> > thrown when recovering from svaepoint >>>> > >>>> > public static final String KAFKA_TABLE_FORMAT = >>>> > "CREATE TABLE "+TABLE_NAME+" (\n" + >>>> > " "+COLUMN_NAME+" STRING\n" + >>>> > ") WITH (\n" + >>>> > " 'connector' = 'kafka',\n" + >>>> > " 'topic' = '%s',\n" + >>>> > " 'properties.bootstrap.servers' = '%s',\n" + >>>> > " 'sink.semantic' = 'exactly-once',\n" + >>>> > " 'properties.transaction.timeout.ms' = >>>> > '900000',\n" + >>>> > " 'sink.partitioner' = >>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + >>>> > " 'format' = 'dbz-json'\n" + >>>> > ")\n"; >>>> > [] - Source: TableSourceScan(table=[[default_catalog, >>>> default_database, >>>> > debezium_source]], fields=[data]) -> Sink: Sink >>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) >>>> (1/1 >>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to >>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException: >>>> > Unexpected error in InitProducerIdResponse; Producer attempted an >>>> > operation with an old epoch. Either there is a newer producer with the >>>> > same transactionalId, or the producer's transaction has been expired >>>> by >>>> > the broker. >>>> > at org.apache.kafka.clients.producer.internals. >>>> > >>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager >>>> > .java:1352) >>>> > at org.apache.kafka.clients.producer.internals. >>>> > >>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: >>>> > 1260) >>>> > at >>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse >>>> > .java:109) >>>> > at org.apache.kafka.clients.NetworkClient.completeResponses( >>>> > NetworkClient.java:572) >>>> > at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) >>>> > at org.apache.kafka.clients.producer.internals.Sender >>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414) >>>> > at >>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender >>>> > .java:312) >>>> > at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: >>>> > 239) >>>> > at java.lang.Thread.run(Thread.java:748) >>>> > >>>> >>>