Hi Becket, I have reproduced this problem in our development environment. Below is the log message with debug level. Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.
There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`. I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks. *Number of Kafka brokers: 3* *logging config for kafka:* > log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender > > log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log > log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m > (%c)%n > log4j.appender.transactionAppender.MaxFileSize=10MB > log4j.appender.transactionAppender.MaxBackupIndex=10 > log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender > log4j.additivity.kafka.coordinator.transaction=true > *flink-ui* > > Timestamp: 2019-09-18, 07:13:43 > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of > transactions failed, logging first encountered failure > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) > ... 5 more > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state > *broker-3* > > [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] > TransactionalId: blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but > received transaction marker result to send: COMMIT > (kafka.coordinator.transaction.TransactionCoordinator) > [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting > append of COMMIT to transaction log with coordinator and returning > INVALID_TXN_STATE error to client for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request > (kafka.coordinator.transaction.TransactionCoordinator) > [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] > TransactionalId: blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but > received transaction marker result to send: COMMIT > (kafka.coordinator.transaction.TransactionCoordinator) > [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting > append of COMMIT to transaction log with coordinator and returning > INVALID_TXN_STATE error to client for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request > (kafka.coordinator.transaction.TransactionCoordinator) > [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating > blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's > transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, > txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with > coordinator epoch 4 for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > *broker-2* > [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating > blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's > transaction state to TxnTransitMetadata(producerId=7019, produc > erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with > coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6 > 0de7e4744f3307058f865-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating > blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's > transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, > txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with > coordinator epoch 0 for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating > blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's > transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, > txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with > coordinator epoch 0 for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating > blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's > transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, > txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with > coordinator epoch 0 for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning > NOT_COORDINATOR error code to client for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request > (kafka.coordinator.transaction.TransactionCoordinator) > [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting > append of COMMIT to transaction log with coordinator and returning > NOT_COORDINATOR error to client for blacklist -> Sink: > kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request > (kafka.coordinator.transaction.TransactionCoordinator) > Best, Tony Wei Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道: > Hi Tony, > > From the symptom it is not quite clear to me what may cause this issue. > Supposedly the TransactionCoordinator is independent of the active > controller, so bouncing the active controller should not have special > impact on the transactions (at least not every time). If this is stably > reproducible, is it possible to turn on debug level logging > on kafka.coordinator.transaction.TransactionCoordinator to see what does > the broker say? > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com> wrote: > >> Hi, >> >> Has anyone run into the same problem? I have updated my producer >> transaction timeout to 1.5 hours, >> but the problem sill happened when I restarted broker with active >> controller. It might not due to the >> problem that checkpoint duration is too long causing transaction timeout. >> I had no more clue to find out >> what's wrong about my kafka producer. Could someone help me please? >> >> Best, >> Tony Wei >> >> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道: >> >>> Hi Tony, >>> >>> I'm sorry I cannot help you with this issue, but Becket (in CC) might >>> have an idea what went wrong here. >>> >>> Best, Fabian >>> >>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei < >>> tony19920...@gmail.com>: >>> >>>> Hi, >>>> >>>> Currently, I was trying to update our kafka cluster with larger ` >>>> transaction.max.timeout.ms`. The >>>> original setting is kafka's default value (i.e. 15 minutes) and I tried >>>> to set as 3 hours. >>>> >>>> When I was doing rolling-restart for my brokers, this exception came to >>>> me on the next checkpoint >>>> after I restarted the broker with active controller. >>>> >>>> java.lang.RuntimeException: Error while confirming checkpoint at >>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at >>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) Caused by: >>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of >>>>> transactions >>>>> failed, logging first encountered failure at >>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) >>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 >>>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>>>> The producer attempted a transactional operation in an invalid state >>>> >>>> >>>> I have no idea why it happened, and I didn't find any error log from >>>> brokers. Does anyone have >>>> this exception before? How can I prevent from this exception when I >>>> tried to restart kafka cluster? >>>> Does this exception mean that I will lost data in some of these >>>> transactions? >>>> >>>> flink cluster version: 1.8.1 >>>> kafka cluster version: 1.0.1 >>>> flink kafka producer version: universal >>>> producer transaction timeout: 15 minutes >>>> checkpoint interval: 5 minutes >>>> number of concurrent checkpoint: 1 >>>> max checkpoint duration before and after the exception occurred: < 2 >>>> minutes >>>> >>>> Best, >>>> Tony Wei >>>> >>>