Hi Becket,

I have reproduced this problem in our development environment. Below is the
log message with debug level.
Seems that the exception was from broker-3, and I also found other error
code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just
list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's`
substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make
transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's
happening? Thanks.

*Number of Kafka brokers: 3*
*logging config for kafka:*

> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>
> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
> (%c)%n
> log4j.appender.transactionAppender.MaxFileSize=10MB
> log4j.appender.transactionAppender.MaxBackupIndex=10
> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
> log4j.additivity.kafka.coordinator.transaction=true
>


*flink-ui*
>
> Timestamp: 2019-09-18, 07:13:43
>


java.lang.RuntimeException: Error while confirming checkpoint
>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
> transactions failed, logging first encountered failure
>     at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>     ... 5 more
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
> producer attempted a transactional operation in an invalid state
>


*broker-3*
>
> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
> TransactionalId: blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
> received transaction marker result to send: COMMIT
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
> append of COMMIT to transaction log with coordinator and returning
> INVALID_TXN_STATE error to client for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
> TransactionalId: blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
> received transaction marker result to send: COMMIT
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
> append of COMMIT to transaction log with coordinator and returning
> INVALID_TXN_STATE error to client for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating
> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4,
> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with
> coordinator epoch 4 for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
>

*broker-2*

> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, produc
> erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with
> coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
> 0de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1,
> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2,
> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3,
> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with
> coordinator epoch 0 for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning
> NOT_COORDINATOR error code to client for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting
> append of COMMIT to transaction log with coordinator and returning
> NOT_COORDINATOR error to client for blacklist -> Sink:
> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
>

Best,
Tony Wei


Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道:

> Hi Tony,
>
> From the symptom it is not quite clear to me what may cause this issue.
> Supposedly the TransactionCoordinator is independent of the active
> controller, so bouncing the active controller should not have special
> impact on the transactions (at least not every time). If this is stably
> reproducible, is it possible to turn on debug level logging
> on kafka.coordinator.transaction.TransactionCoordinator to see what does
> the broker say?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com> wrote:
>
>> Hi,
>>
>> Has anyone run into the same problem? I have updated my producer
>> transaction timeout to 1.5 hours,
>> but the problem sill happened when I restarted broker with active
>> controller. It might not due to the
>> problem that checkpoint duration is too long causing transaction timeout.
>> I had no more clue to find out
>> what's wrong about my kafka producer. Could someone help me please?
>>
>> Best,
>> Tony Wei
>>
>> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:
>>
>>> Hi Tony,
>>>
>>> I'm sorry I cannot help you with this issue, but Becket (in CC) might
>>> have an idea what went wrong here.
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>>> tony19920...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> Currently, I was trying to update our kafka cluster with larger `
>>>> transaction.max.timeout.ms`. The
>>>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>>>> to set as 3 hours.
>>>>
>>>> When I was doing rolling-restart for my brokers, this exception came to
>>>> me on the next checkpoint
>>>> after I restarted the broker with active controller.
>>>>
>>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of 
>>>>> transactions
>>>>> failed, logging first encountered failure at
>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>> The producer attempted a transactional operation in an invalid state
>>>>
>>>>
>>>> I have no idea why it happened, and I didn't find any error log from
>>>> brokers. Does anyone have
>>>> this exception before? How can I prevent from this exception when I
>>>> tried to restart kafka cluster?
>>>> Does this exception mean that I will lost data in some of these
>>>> transactions?
>>>>
>>>> flink cluster version: 1.8.1
>>>> kafka cluster version: 1.0.1
>>>> flink kafka producer version: universal
>>>> producer transaction timeout: 15 minutes
>>>> checkpoint interval: 5 minutes
>>>> number of concurrent checkpoint: 1
>>>> max checkpoint duration before and after the exception occurred:  < 2
>>>> minutes
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>

Reply via email to