Hi Becket,

One more thing, I have tried to restart other brokers without active
controller, but
this exception might happen as well. So it should be independent  of the
active
controller like you said.

Best,
Tony Wei

Tony Wei <tony19920...@gmail.com> 於 2019年9月18日 週三 下午6:14寫道:

> Hi Becket,
>
> I have reproduced this problem in our development environment. Below is
> the log message with debug level.
> Seems that the exception was from broker-3, and I also found other error
> code in broker-2 during the time.
>
> There are others INVALID_TXN_STATE error for other transaction id. I just
> list one of them. Above log messages only
> shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's`
> substring before `2019-09-18 07:14`.
>
> I didn't see other information to find out why producer tried to make
> transaction state from EMPTY to COMMIT, and what
> made NOT_COORDINATOR happened. Do you have any thought about what's
> happening? Thanks.
>
> *Number of Kafka brokers: 3*
> *logging config for kafka:*
>
>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>
>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
>> (%c)%n
>> log4j.appender.transactionAppender.MaxFileSize=10MB
>> log4j.appender.transactionAppender.MaxBackupIndex=10
>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>> log4j.additivity.kafka.coordinator.transaction=true
>>
>
>
> *flink-ui*
>>
>> Timestamp: 2019-09-18, 07:13:43
>>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>>     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
>> transactions failed, logging first encountered failure
>>     at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>>     ... 5 more
>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
>> producer attempted a transactional operation in an invalid state
>>
>
>
> *broker-3*
>>
>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating
>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4,
>> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with
>> coordinator epoch 4 for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>>
>
> *broker-2*
>
>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating
>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, produc
>> erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with
>> coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
>> 0de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating
>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1,
>> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with
>> coordinator epoch 0 for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating
>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2,
>> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with
>> coordinator epoch 0 for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating
>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3,
>> txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with
>> coordinator epoch 0 for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning
>> NOT_COORDINATOR error code to client for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> NOT_COORDINATOR error to client for blacklist -> Sink:
>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>>
>
> Best,
> Tony Wei
>
>
> Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道:
>
>> Hi Tony,
>>
>> From the symptom it is not quite clear to me what may cause this issue.
>> Supposedly the TransactionCoordinator is independent of the active
>> controller, so bouncing the active controller should not have special
>> impact on the transactions (at least not every time). If this is stably
>> reproducible, is it possible to turn on debug level logging
>> on kafka.coordinator.transaction.TransactionCoordinator to see what does
>> the broker say?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Has anyone run into the same problem? I have updated my producer
>>> transaction timeout to 1.5 hours,
>>> but the problem sill happened when I restarted broker with active
>>> controller. It might not due to the
>>> problem that checkpoint duration is too long causing transaction
>>> timeout. I had no more clue to find out
>>> what's wrong about my kafka producer. Could someone help me please?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:
>>>
>>>> Hi Tony,
>>>>
>>>> I'm sorry I cannot help you with this issue, but Becket (in CC) might
>>>> have an idea what went wrong here.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>>>> tony19920...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> Currently, I was trying to update our kafka cluster with larger `
>>>>> transaction.max.timeout.ms`. The
>>>>> original setting is kafka's default value (i.e. 15 minutes) and I
>>>>> tried to set as 3 hours.
>>>>>
>>>>> When I was doing rolling-restart for my brokers, this exception came
>>>>> to me on the next checkpoint
>>>>> after I restarted the broker with active controller.
>>>>>
>>>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
>>>>>> at
>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of 
>>>>>> transactions
>>>>>> failed, logging first encountered failure at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>>>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>>> The producer attempted a transactional operation in an invalid state
>>>>>
>>>>>
>>>>> I have no idea why it happened, and I didn't find any error log from
>>>>> brokers. Does anyone have
>>>>> this exception before? How can I prevent from this exception when I
>>>>> tried to restart kafka cluster?
>>>>> Does this exception mean that I will lost data in some of these
>>>>> transactions?
>>>>>
>>>>> flink cluster version: 1.8.1
>>>>> kafka cluster version: 1.0.1
>>>>> flink kafka producer version: universal
>>>>> producer transaction timeout: 15 minutes
>>>>> checkpoint interval: 5 minutes
>>>>> number of concurrent checkpoint: 1
>>>>> max checkpoint duration before and after the exception occurred:  < 2
>>>>> minutes
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>

Reply via email to