[jira] [Commented] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy
[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043753#comment-16043753 ] Matthias J. Sax commented on KAFKA-5242: While trying to add a new integration test for Streams EOS, I encountered a similar issue that prohibited the test design. In order to make the test work, I needed to use two instances with one thread each and different state directories to isolate both, instead of using one instance with 2 thread (cf. {{EosIntegrationTest#shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances()}}. In order to fix this properly, we should extract state restoration from the rebalance callback functions and embed task creation and state restoration within the mail run-loop and also call poll() regularly if we cannot get a lock for a task and during store restoration (right now, for big state, we might miss a rebalance -- but we should actually try to participate in the rebalance and "abort" state restoration if a test is migrated away during rebalance) > add max_number _of_retries to exponential backoff strategy > -- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 0.10.2.2, 0.11.0.1 > > Attachments: clio_170511.log > > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > It was actually problem on our side - we ran startStreams() twice and > therefore we had two threads touching the same folder structure. > But what I've noticed, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I would personally
[jira] [Commented] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy
[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011025#comment-16011025 ] Matthias J. Sax commented on KAFKA-5242: Both should work. From a Streams perspective, it's no difference if you run multiple threads within one {{KafkaStreams}} instance, or multiple {{KafkaStreams}} instances within one JVM, or multiple JVMs (with one {{KafkaStreams}} instance) on the same host. This should all work, including any combination... How many threads to you use? If you run with a single thread, with a single {{KafkaStreams}} instance the issue cannot occur as you need at least two threads running -- this would explain why the lock issues is exposes by your bug starting multiple instances. Anyway, as mentioned above, please upgrade to {{0.10.2.1}} -- we fixed couple of lock issues their. > add max_number _of_retries to exponential backoff strategy > -- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Priority: Minor > Attachments: clio_170511.log > > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > It was actually problem on our side - we ran startStreams() twice and > therefore we had two threads touching the same folder structure. > But what I've noticed, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I would personally make that backoffstrategy a bit more configurable with a > number of retries that if it exceed a configured value it propagates the > exception as any other exception to custom client exception handler. > (I can provide a patch) -- This message was sent by
[jira] [Commented] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy
[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010974#comment-16010974 ] Lukas Gemela commented on KAFKA-5242: - [~mjsax] by multiple instances you mean multiple JVMs (nodes) running or multiple instances running within the same jvm process? What happened was that by accident we created two instances running within the single JVM process, touching the same data on hard drive: new KafkaStreams(builder, streamsConfig).start(); If this is possible way how to run kafka streams then there is definitely a bug in locking mechanism. I've attached logfiles for this situation, unfortunatelly only with debug level set to INFO. ad backoff strategy, you can do something similar like how it's done in akka lib (cap it with maximal duration): http://doc.akka.io/japi/akka/2.4/akka/pattern/Backoff.html Thanks! L. > add max_number _of_retries to exponential backoff strategy > -- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Priority: Minor > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > It was actually problem on our side - we ran startStreams() twice and > therefore we had two threads touching the same folder structure. > But what I've noticed, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I would personally make that backoffstrategy a bit more configurable with a > number of retries that if it exceed a configured value it propagates the > exception as any other exception to custom client exception handler. > (I can provide a patch) -- This message was sent by Atlassian JIRA
[jira] [Commented] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy
[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010936#comment-16010936 ] Matthias J. Sax commented on KAFKA-5242: We fixed a couple of state directory lock issues in {{0.10.2.1}} -- thus I am wondering if is it already fixed there? About the retry logic: the fact that you did create multiple {{KafkaStreams}} instances and started both should not have any influence on the behavior and should not cause the issue (if it hangs forever, it's a bug in lock management and should be independent on starting one or two instances -- maybe multiple instances expose the bug with higher probability, but it should not be the root cause). You should only get more parallel running instances, and load should be redistributed over more threads. We do try infinitely atm, as we know that the lock will be release eventually (as long as there is no bug). We try to keep the number of config values as small as possible. Thus, I am wondering if if might be sufficient to hard code a max retry time or count? We might also put an upper bound on back-off time -- the current strategy seems to be too aggressive. WDYT? > add max_number _of_retries to exponential backoff strategy > -- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Priority: Minor > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > It was actually problem on our side - we ran startStreams() twice and > therefore we had two threads touching the same folder structure. > But what I've noticed, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I