[ 
https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010974#comment-16010974
 ] 

Lukas Gemela commented on KAFKA-5242:
-------------------------------------

[~mjsax] by multiple instances you mean multiple JVMs (nodes) running or 
multiple instances running within the same jvm process? 

What happened was that by accident we created two instances running within the 
single JVM process, touching the same data on hard drive:
new KafkaStreams(builder, streamsConfig).start(); 

If this is possible way how to run kafka streams then there is definitely a bug 
in locking mechanism. I've attached logfiles for this situation, unfortunatelly 
only with debug level set to INFO.

ad backoff strategy,  you can do something similar like how it's done in akka 
lib (cap it with maximal duration): 
http://doc.akka.io/japi/akka/2.4/akka/pattern/Backoff.html 

Thanks!

L.


> add max_number _of_retries to exponential backoff strategy
> ----------------------------------------------------------
>
>                 Key: KAFKA-5242
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5242
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Lukas Gemela
>            Priority: Minor
>
> From time to time, during relabance we are getting a lot of exceptions saying 
> {code}
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the 
> state directory: /app/db/clio/0_0
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>  [kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>  [kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  [kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>  [kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>  [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>  [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>  [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>  [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [kafka-clients-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>  [kafka-streams-0.10.2.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>  [kafka-streams-0.10.2.0.jar!/:?]
> {code}
> (see attached logfile)
> It was actually problem on our side - we ran startStreams() twice and 
> therefore we had two threads touching the same folder structure. 
> But what I've noticed, the backoff strategy in 
> StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 
> 20 iterations it takes 6hours until the next attempt to start a task. 
> I've noticed latest code contains check for rebalanceTimeoutMs, but that 
> still does not solve the problem especially in case 
> MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka 
> streams just hangs up indefinitely.
> I would personally make that backoffstrategy a bit more configurable with a 
> number of retries that if it exceed a configured value it propagates the 
> exception as any other exception to custom client exception handler.
> (I can provide a patch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to