Thanks Guozhang, Yes we are hitting the same issue.

I see in that ticket that it’s fixed in 1.1.2, 2.0.1, 2.1.0, but I don’t see 
those versions in here https://kafka.apache.org/downloads on the Kafka download 
page. How can we get access to any of these versions?

Thanks
Aravind





> On Sep 30, 2018, at 1:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Could you take a look at https://issues.apache.org/jira/browse/KAFKA-7284
> and see if you are hitting this?
> 
> Guozhang
> 
> On Fri, Sep 28, 2018 at 5:22 PM, Aravind Dongara <adong...@yahoo.com.invalid
>> wrote:
> 
>> Hi Guozhang
>> 
>> Thanks for your reply.
>> We are using Kafka 1.1.1
>> 
>> Thanks
>> Aravind
>> 
>> 
>>> On Sep 28, 2018, at 4:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> 
>>> Hello Aravind,
>>> 
>>> Which version of Kafka are you currently using? What you described seems
>> to
>>> be fixed in the latest version already, so I want to check if you are
>> using
>>> an older version and if yes, what's the best way to work around it.
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara <
>>> adong...@yahoo.com.invalid> wrote:
>>> 
>>>> 
>>>> During a rebalance triggered by kafka-coordinator-heartbeat-thread
>> losing
>>>> connection to ‘Group coordinator’, we noticed that a stream thread is
>>>> shutting down when it catches a ProducerFencedExcpetion while flushing
>> the
>>>> state store.
>>>> This also causes the stream-state on that node to be stuck in
>>>> ‘REBALANCING’ state, even though the partitions have been rebalanced to
>>>> other threads across nodes.
>>>> During rebalance there seems to be a race condition between flushState
>> on
>>>> one node vs ProducerId creation on other node for the same partition. If
>>>> the flushState is slower than the other it encounters
>>>> ProducerFencedException.
>>>> 
>>>> It would be nice if Kafka streams can handle this exception gracefully
>> and
>>>> not shutdown the thread, so that we don’t end up with uneven number of
>>>> threads across nodes.
>>>> Can you guys please suggest any work arounds for this situation?
>>>> 
>>>> Thanks
>>>> Aravind
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]
>> [o.a.k.c.producer.internals.Sender]
>>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
>>>> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer
>>>> batches due to fatal error
>>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> ProcessorStateManager]
>>>> task [0_55] Failed to flush state store upsert-store:
>>>> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort
>>>> sending since an error caught with a previous record (key
>>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
>>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
>> upsert-store-changelog
>>>> due to Cannot perform send because at least one previous transactional
>> or
>>>> idempotent request has failed with errors.
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:69)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:29)
>>>>       at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.
>>>> putAndMaybeForward(CachingKeyValueStore.java:105)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:142)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:100)
>>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
>>>> flush(ThreadCache.java:127)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AbstractTask.flushState(AbstractTask.java:195)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.flushState(StreamTask.java:339)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask$1.run(StreamTask.java:312)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.commit(StreamTask.java:307)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:440)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:422)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
>>>> because at least one previous transactional or idempotent request has
>>>> failed with errors.
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> failIfNotReadyForSend(TransactionManager.java:279)
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
>>>> KafkaProducer.java:828)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
>>>> KafkaProducer.java:784)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
>>>>       ... 34 common frames omitted
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> AssignedStreamsTasks]
>>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Suspending stream task 0_55 failed due to the following error:
>>>> org.apache.kafka.streams.errors.ProcessorStateException: task [0_55]
>>>> Failed to flush state store upsert-store
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:246)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AbstractTask.flushState(AbstractTask.java:195)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.flushState(StreamTask.java:339)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask$1.run(StreamTask.java:312)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.commit(StreamTask.java:307)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:440)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:422)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>> [0_55]
>>>> Abort sending since an error caught with a previous record (key
>>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
>>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
>> upsert-store-changelog
>>>> due to Cannot perform send because at least one previous transactional
>> or
>>>> idempotent request has failed with errors.
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:69)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:29)
>>>>       at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.
>>>> putAndMaybeForward(CachingKeyValueStore.java:105)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:142)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:100)
>>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
>>>> flush(ThreadCache.java:127)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
>>>>       ... 21 common frames omitted
>>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
>>>> because at least one previous transactional or idempotent request has
>>>> failed with errors.
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> failIfNotReadyForSend(TransactionManager.java:279)
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
>>>> KafkaProducer.java:828)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
>>>> KafkaProducer.java:784)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
>>>>       ... 34 common frames omitted
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> [2018-09-26T15:39:54,666Z]  [WARN ]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]  [o.a.k.s.p.i.
>> RecordCollectorImpl]
>>>> task [0_55] Error sending record (key 5a07e803-9323-457c-8e1d-
>> 29c0b0bc0fa9
>>>> value [0, 0, 1, 102, 22, -119, 88, -32, 0, 0, 0, -95, 97, 112, 112, 100,
>>>> 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 101, 49, 100, 52, 102,
>> 56,
>>>> 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 50, 53, 45, 52, 55,
>>>> 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 105, 122, 95,
>> 116,
>>>> 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 50, 52, -62,
>> -79,
>>>> 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, -79, 114,
>> 101,
>>>> 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 55, 101,
>> 56,
>>>> 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100,
>> 45,
>>>> 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97, 108,
>>>> 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115,
>> 101,
>>>> -62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110, 116,
>> 84,
>>>> 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45,
>> 48,
>>>> 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43,
>> 48,
>>>> 48, 48, 48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115, 34,
>> 58,
>>>> 91, 123, 10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101,
>> 115,
>>>> 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54,
>> 84,
>>>> 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34,
>> 44,
>>>> 10, 34, 117, 115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65,
>> 99,
>>>> 99, 111, 117, 110, 116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105,
>> 118,
>>>> 97, 108, 105, 97, 118, 101, 110, 116, 97, 100, 105, 114, 101, 99, 116,
>> 97,
>>>> 115, 97, 45, 97, 48, 113, 51, 52, 48, 48, 48, 48, 48, 100, 106, 115,
>> 118,
>>>> 52, 101, 97, 100, 34, 44, 34, 69, 115, 73, 110, 100, 101, 120, 67, 108,
>>>> 117, 115, 116, 101, 114, 34, 58, 34, 112, 114, 100, 52, 52, 45, 49, 50,
>> 34,
>>>> 10, 125, 44, 10, 34, 116, 105, 101, 114, 34, 58, 34, 97, 112, 105, 95,
>> 112,
>>>> 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 101, 114, 73, 100, 34, 58,
>> 34,
>>>> 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 111, 100, 101, 34, 58,
>> 34,
>>>> 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 100, 52, 53, 50,
>> 34,
>>>> 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 52, 54, 57, 54,
>>>> 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 120, 112,
>>>> 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76,
>> 34,
>>>> 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116,
>> 114,
>>>> 117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109, 101,
>>>> 110, 116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99,
>> 116,
>>>> 105, 111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10,
>> 34,
>>>> 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112,
>> 114,
>>>> 100, 52, 52, 45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34,
>> 97,
>>>> 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 53,
>>>> 49, 48, 54, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73,
>> 68,
>>>> 34, 58, 34, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52,
>>>> 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99,
>> 48,
>>>> 102, 97, 57, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111,
>>>> 110, 78, 97, 109, 101, 34, 58, 34, 97, 112, 105, 95, 118, 51, 46, 92,
>> 47,
>>>> 118, 51, 92, 47, 101, 118, 101, 110, 116, 115, 92, 47, 66, 114, 111,
>> 119,
>>>> 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 47, 101, 118, 101, 110,
>> 116,
>>>> 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 73, 100,
>>>> 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 1537976391822) to
>> topic
>>>> upsert-merger-stream-oa43-1-upsert-store-changelog due to Producer
>>>> attempted an operation with an old epoch. Either there is a newer
>> producer
>>>> with the same transactionalId, or the producer's transaction has been
>>>> expired by the broker.; No more records will be sent and no more offsets
>>>> will be recorded for this task.
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,784Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> AssignedStreamsTasks]
>>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> After suspending failed, closing the same stream task 0_55 failed again
>> due
>>>> to the following error:
>>>> org.apache.kafka.common.KafkaException: Cannot execute transactional
>>>> method because we are in an error state
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeFailWithError(TransactionManager.java:784)
>>>>       at org.apache.kafka.clients.producer.internals.
>>>> TransactionManager.beginAbort(TransactionManager.java:229)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.
>>>> abortTransaction(KafkaProducer.java:660)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.closeSuspended(StreamTask.java:493)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.close(StreamTask.java:553)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:192)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,801Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Error caught during partition revocation, will abort the current process
>>>> and re-throw at the end of rebalance: stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> failed to suspend stream tasks
>>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> partition revocation took 156 ms.
>>>>       suspended active tasks: [0_55]
>>>>       suspended standby tasks: [0_50]
>>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> AbstractCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> (Re-)joining group
>>>> 
>>>> [2018-09-26T15:39:56,277Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> AbstractCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Successfully joined group with generation 113
>>>> 
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> ConsumerCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Setting newly assigned partitions [oa43-1-event-upsert-48]
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15]  [o.a.k.c.c.i.
>> ConsumerCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-15-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Setting newly assigned partitions [oa43-1-event-upsert-65]
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>>>> 
>>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_48-producer]  [o.a.k.c.p.i.
>> TransactionManager]
>>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-0_48-producer,
>>>> transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to 13
>>>> with epoch 81
>>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> partition assignment took 217 ms.
>>>>       current active tasks: [0_48]
>>>>       current standby tasks: [0_49]
>>>>       previous active tasks: [0_55]
>>>> 
>>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
>>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Shutting down
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> -- Guozhang

Reply via email to