Lukasz Gluchowski created KAFKA-7088:
----------------------------------------

             Summary: Kafka streams thread waits infinitely on transaction init
                 Key: KAFKA-7088
                 URL: https://issues.apache.org/jira/browse/KAFKA-7088
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.1
         Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
20:07:43 UTC 2018 
kafka-streams (client) 1.0.1
kafka broker 1.1.0
Java version:
OpenJDK Runtime Environment (build 1.8.0_171-b10)
OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)

kakfa config overrides:
num.stream.threads: 6
session.timeout.ms: 10000
request.timeout.ms: 11000
fetch.max.wait.ms: 500
max.poll.records: 1000

topic has 24 partitions
            Reporter: Lukasz Gluchowski


A kafka stream application thread stops processing without any feedback. The 
topic has 24 partitions and I noticed that processing stopped only for some 
partitions. I will describe what happened to partition:10. The application is 
still running (now for about 8 hours) and that thread is hanging there and no 
rebalancing that took place.

There is no error (we have a custom `Thread.UncaughtExceptionHandler` which was 
not called). I noticed that after couple of minutes stream stopped processing 
(at offset 32606948 where log-end-offset is 33472402). 

Broker itself is not reporting any active consumer in that consumer group and 
the only info I was able to gather was from thread dump:
{code:java}
"mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
 #113 prio=5 os_prio=0 tid=0x00007fe07c6349b0 nid=0xf7a waiting on condition 
[0x00007fe0215d4000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fec6a2f8> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
at 
org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:151)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
at 
org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
at 
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
 

I tried restarting application once but the situation repeated. Thread read 
some data, committed offset and stopped processing, leaving that thread in wait 
state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to