Lukas Gemela created KAFKA-5242:
-----------------------------------

             Summary: state task directory locked forever
                 Key: KAFKA-5242
                 URL: https://issues.apache.org/jira/browse/KAFKA-5242
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Lukas Gemela
         Attachments: clio_170511.log

>From time to time, during relabance we are getting a lot of exceptions saying 

{code}
org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the 
state directory: /app/db/clio/0_0
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[kafka-clients-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [kafka-streams-0.10.2.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [kafka-streams-0.10.2.0.jar!/:?]
{code}

(see attached logfile)

By looking at the code looks like the some old tasks are never being closed and 
the lock is never released.

Also, the backoff strategy in StreamThread$AbstractTaskCreator.retryWithBackoff 
can run endlessly - after 20 iterations it takes 6hours until the next attempt 
to start a task. 
I've noticed latest code contains check for rebalanceTimeoutMs, but that still 
does not solve the problem especially in case MAX_POLL_INTERVAL_MS_CONFIG is 
set to Integer.MAX_INT. 

I would personally make that backoffstrategy a bit more configurable with a 
number of retries that if it exceed a configured value it propagates the 
exception as any other exception to custom client exception handler.
(I can provide a patch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to