Re: Kafka Streams Failed to rebalance error
But my stream definition does not have a state store at all, Rocksdb or in memory... That's the most concerning part... On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal wrote: > One instance with 10 threads may cause rocksdb issues. > What is the RAM you have? > > Also check CPU wait time. Many rocks db instances on one machine (depends > upon number of partitions) may cause lot of disk i/o causing wait times to > increase and hence slowing down the message processing causing frequent > rebalance's. > > Also what is your topic partitions. My experience is having one thread per > partition is ideal. > > Thanks > Sachin > > > On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto > wrote: > > > There is one instance with 10 threads. > > > > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang wrote: > > > > > João, > > > > > > Do you also have multiple running instances in parallel, and how many > > > threads are your running within each instance? > > > > > > Guozhang > > > > > > > > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto > > > wrote: > > > > > > > Eno before I do so I just want to be sure this would not be a > > duplicate. > > > I > > > > just found the following issues: > > > > > > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being > > > fixed > > > > on 0.11.0.0/0.10.2.2 (both not released afaik) > > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in > > > progress > > > > > > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska > > > > wrote: > > > > > > > > > Hi there, > > > > > > > > > > This might be a bug, would you mind opening a JIRA (copy-pasting > > below > > > is > > > > > sufficient). > > > > > > > > > > Thanks > > > > > Eno > > > > > > On 7 Jun 2017, at 21:38, João Peixoto > > > wrote: > > > > > > > > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > > > > > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > > > > > o.a.k.s.p.internals.StreamThread : Could not create task > > > 0_31. > > > > > Will > > > > > > retry. > > > > > > > > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed > > to > > > > lock > > > > > > the state directory for task 0_31 > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > ProcessorStateManager.(ProcessorStateManager.java:100) > > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > > > > AbstractTask.java:73) > > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamTask.(StreamTask.java:108) > > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.createStreamTask(StreamThread.java:864) > > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread$TaskCreator. > > > > createTask(StreamThread.java:1237) > > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.addStreamTasks(StreamThread.java:967) > > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.access$600( > > > > StreamThread.java:69) > > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > > onPartitionsAssigned(StreamThread.java:234) > > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > > onJoinComplete(ConsumerCoordinator.java:259) > > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > joinGroupIfNeeded(AbstractCoordinator.java:352) > > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > ensureActiveGroup(AbstractCoordinator.java:303) > > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator.poll( > > > > ConsumerCoordinator.java:290) > > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > > pollOnce(KafkaConsumer.java:1029) > > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > > at > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > KafkaCons
Re: Kafka Streams Failed to rebalance error
One instance with 10 threads may cause rocksdb issues. What is the RAM you have? Also check CPU wait time. Many rocks db instances on one machine (depends upon number of partitions) may cause lot of disk i/o causing wait times to increase and hence slowing down the message processing causing frequent rebalance's. Also what is your topic partitions. My experience is having one thread per partition is ideal. Thanks Sachin On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto wrote: > There is one instance with 10 threads. > > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang wrote: > > > João, > > > > Do you also have multiple running instances in parallel, and how many > > threads are your running within each instance? > > > > Guozhang > > > > > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto > > wrote: > > > > > Eno before I do so I just want to be sure this would not be a > duplicate. > > I > > > just found the following issues: > > > > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being > > fixed > > > on 0.11.0.0/0.10.2.2 (both not released afaik) > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in > > progress > > > > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska > > > wrote: > > > > > > > Hi there, > > > > > > > > This might be a bug, would you mind opening a JIRA (copy-pasting > below > > is > > > > sufficient). > > > > > > > > Thanks > > > > Eno > > > > > On 7 Jun 2017, at 21:38, João Peixoto > > wrote: > > > > > > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > > > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > > > > o.a.k.s.p.internals.StreamThread : Could not create task > > 0_31. > > > > Will > > > > > retry. > > > > > > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed > to > > > lock > > > > > the state directory for task 0_31 > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > ProcessorStateManager.(ProcessorStateManager.java:100) > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > > > AbstractTask.java:73) > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamTask.(StreamTask.java:108) > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.createStreamTask(StreamThread.java:864) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > StreamThread$TaskCreator. > > > createTask(StreamThread.java:1237) > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.addStreamTasks(StreamThread.java:967) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.access$600( > > > StreamThread.java:69) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > onPartitionsAssigned(StreamThread.java:234) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > onJoinComplete(ConsumerCoordinator.java:259) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > joinGroupIfNeeded(AbstractCoordinator.java:352) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > ensureActiveGroup(AbstractCoordinator.java:303) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll( > > > ConsumerCoordinator.java:290) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > pollOnce(KafkaConsumer.java:1029) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > KafkaConsumer.java:995) > > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > StreamThread.java:592) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.run(StreamThread.java:361) > > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > > > > > > > > > > It has been printing it for hours now
Re: Kafka Streams Failed to rebalance error
There is one instance with 10 threads. On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang wrote: > João, > > Do you also have multiple running instances in parallel, and how many > threads are your running within each instance? > > Guozhang > > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto > wrote: > > > Eno before I do so I just want to be sure this would not be a duplicate. > I > > just found the following issues: > > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being > fixed > > on 0.11.0.0/0.10.2.2 (both not released afaik) > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in > progress > > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska > > wrote: > > > > > Hi there, > > > > > > This might be a bug, would you mind opening a JIRA (copy-pasting below > is > > > sufficient). > > > > > > Thanks > > > Eno > > > > On 7 Jun 2017, at 21:38, João Peixoto > wrote: > > > > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > > > o.a.k.s.p.internals.StreamThread : Could not create task > 0_31. > > > Will > > > > retry. > > > > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to > > lock > > > > the state directory for task 0_31 > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorStateManager.(ProcessorStateManager.java:100) > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > > AbstractTask.java:73) > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamTask.(StreamTask.java:108) > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.createStreamTask(StreamThread.java:864) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > > createTask(StreamThread.java:1237) > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.addStreamTasks(StreamThread.java:967) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$600( > > StreamThread.java:69) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > onPartitionsAssigned(StreamThread.java:234) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > onJoinComplete(ConsumerCoordinator.java:259) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > joinGroupIfNeeded(AbstractCoordinator.java:352) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > ensureActiveGroup(AbstractCoordinator.java:303) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > ConsumerCoordinator.java:290) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > pollOnce(KafkaConsumer.java:1029) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > KafkaConsumer.java:995) > > > > [kafka-clients-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:592) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:361) > > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > > > > > > > It has been printing it for hours now, so it does not recover at all. > > > > The most worrying thing is that this stream definition does not even > > use > > > > state stores, it literally looks like this: > > > > > > > > KStreamBuilder builder = new KStreamBuilder(); > > > >KStream kStream = > > > > builder.stream(appOptions.getInput().getTopic()); > > > >kStream.process(() -> processor); > > > >new KafkaStreams(builder, streamsConfiguration); > > > > > > > > The "processor" does its thing and calls "context().commit()" when > > done. > > > > That's it. Looking at the actual machine running the instance, the > > > folders > > > > under /tmp/kafka-streams// only have a .lock file. > > > > > > > > This seems to have been bootstrapped by the exception: > > > > > > > > org.apa
Re: Kafka Streams Failed to rebalance error
João, Do you also have multiple running instances in parallel, and how many threads are your running within each instance? Guozhang On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto wrote: > Eno before I do so I just want to be sure this would not be a duplicate. I > just found the following issues: > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed > on 0.11.0.0/0.10.2.2 (both not released afaik) > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska > wrote: > > > Hi there, > > > > This might be a bug, would you mind opening a JIRA (copy-pasting below is > > sufficient). > > > > Thanks > > Eno > > > On 7 Jun 2017, at 21:38, João Peixoto wrote: > > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > > o.a.k.s.p.internals.StreamThread : Could not create task 0_31. > > Will > > > retry. > > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to > lock > > > the state directory for task 0_31 > > > at > > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.(ProcessorStateManager.java:100) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > AbstractTask.java:73) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:108) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:864) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > createTask(StreamThread.java:1237) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:967) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$600( > StreamThread.java:69) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:234) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:259) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:352) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:303) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:290) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1029) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:592) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > > > > It has been printing it for hours now, so it does not recover at all. > > > The most worrying thing is that this stream definition does not even > use > > > state stores, it literally looks like this: > > > > > > KStreamBuilder builder = new KStreamBuilder(); > > >KStream kStream = > > > builder.stream(appOptions.getInput().getTopic()); > > >kStream.process(() -> processor); > > >new KafkaStreams(builder, streamsConfiguration); > > > > > > The "processor" does its thing and calls "context().commit()" when > done. > > > That's it. Looking at the actual machine running the instance, the > > folders > > > under /tmp/kafka-streams// only have a .lock file. > > > > > > This seems to have been bootstrapped by the exception: > > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit > cannot be > > > completed since the group has already rebalanced and assigned the > > > partitions to another member. This means that the time between > subsequent > > > calls to poll() was longer than the configured max.poll.interval.ms, > > which > > > typically implies that the poll loop is spending too much time message > > > proc
kafka streams checkpoint and restore understanding
I have two app instance, input topic has 2 partitions, each instance config one thread and one replicas. also, instance1's state-store is /tmp/kafka-streams, instance2's state-store is /tmp/kafka-streams2. now I do this experiment to study checkpointin kafka streams (0.10.0.0). 1. start instance1, send two msg(msg1 and msg2) to input topic, no checkpoint file 2. stop instance1, checkpoint file in /tmp/kafka-streams, both partition's offset equals 1 3. restart instance1(althrough new UUID mean new instance, but here consider the same instance as before), no checkpoint file again 4. start instance2, send two msg(msg3 and msg4) to input topic, also no checkpoint file 5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's offset equals 2 6. send two msg(msg5 and msg6) to input topic, now this two msg all go to instance2 7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's offset equals 3 after two instance stopped, below is the checkpoint file content $ strings kafka-streams/*/*/.checkpoint --> instance1 streams-wc-Counts-changelog 0 2 streams-wc-Counts-changelog 1 2 $ strings kafka-streams2/*/*/.checkpoint --> instance2 streams-wc-Counts-changelog 0 3 streams-wc-Counts-changelog 1 3 I draw a simple table about the partition and offset of each msg, also the event happend. Partition,Offset | Partition,Offset| What Happened msg1 P0,0 msg2| P1,0| restart instance1 msg3 P0,1| after start instance2 msg4| P1,1 msg5 P0,2| after stop instance1 msg6| P1,2 Next, use kafka-consumer-offset-checker.sh to check input-topic, and all six msg(each partition has three msg) were consumed $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic streams-wc-input1 --group streams-wc Group Topic Pid Offset logSize Lag Owner streams-wc streams-wc-input1 0 3 3 0 none streams-wc streams-wc-input1 1 3 3 0 none Now If we restart instance1 again, As only one instance exist, so standby task will not take effect. That means, instance1 will create two active StreamTask, and both task will restoreActiveState from changelog topic. when restore active task, we have to seek to some position, here as instance1's state store: /tmp/kafka-streams checkpoint file has both changelog partition, so restoreConsumer will seek to position 2. And change to another situation, What about restart instance2 not instance1? the restoreConsumer will seek to position 3, because instance2's active task read checkpoint in /tmp/kafka-streams2. The different between this two situation is which position beginning to restore StateStore in StreamTask. Situation One, only restart instance1, beginning position 2 means, P0 seek after msg3, P1 seek after msg4 Situation Two, only restart instance12 beginning position 3 means, P0 seek after msg5, P1 seek after msg6 from Partition view, msg1,msg3,msg5 go to partition 0. msg P0's offset | Instance1 restart | Instance2 restart msg1,1 0 msg3,1 1 msg5,1 2 | <- seek at pos 2 | | seek at pos 3 msg P1's offset | Instance1 restart | Instance2 restart msg2,1 0 msg4,1 1 msg6,1 2 | <- seek at pos 2 | | <-seek at pos 3 The restore process is poll records from changelog-topic at specific position. in situation one, restore msg5 and msg6 to StreamTask's state store. msg5 to task1, msg6 to task0 in situation two, restore nothing to StreamTask's state store??? I have some question about the restore process and checkpoint below: 1. Should we seek to beginning, because restore state store must be complete view of previous? 2. The two situation described above, What will happen? Hope someone expain to me, or collect me if I'm understand wrong. Tks before.
Re: Async Non Blocking Kafka Producer
If you are setting acks=0 then you don't care about losing data even when the cluster is up. The only way to get at-least-once is acks=all. > On Jun 7, 2017, at 1:12 PM, Ankit Jain wrote: > > Thanks hans. > > It would work but producer will start loosing the data even the Cluster is > available. > > Thanks > Ankit Jain > >> On Wed, Jun 7, 2017 at 12:56 PM, Hans Jespersen wrote: >> >> Try adding props.put("max.block.ms", "0"); >> >> -hans >> >> >> >>> On Jun 7, 2017, at 12:24 PM, Ankit Jain wrote: >>> >>> Hi, >>> >>> We want to use the non blocking Kafka producer. The producer thread >> should >>> not block if the Kafka is cluster is down or not reachable. >>> >>> Currently, we are setting following properties but the Producer thread is >>> still blocking if the Kafka cluster goes gown or unreachable. >>> >>> * props.put("block.on.buffer.full", "false");* >>> * props.put("acks", "0");* >>> >>> -- >>> Thanks >> >> > > > -- > Thanks, > Ankit Jain
Re: Kafka Streams Failed to rebalance error
Eno before I do so I just want to be sure this would not be a duplicate. I just found the following issues: * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed on 0.11.0.0/0.10.2.2 (both not released afaik) * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska wrote: > Hi there, > > This might be a bug, would you mind opening a JIRA (copy-pasting below is > sufficient). > > Thanks > Eno > > On 7 Jun 2017, at 21:38, João Peixoto wrote: > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > o.a.k.s.p.internals.StreamThread : Could not create task 0_31. > Will > > retry. > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock > > the state directory for task 0_31 > > at > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > > ~[kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > > ~[kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > > ~[kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > > [kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > > ~[kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > ~[kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > > [kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > > [kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > > [kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > > [kafka-clients-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > > [kafka-streams-0.10.2.1.jar!/:na] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > It has been printing it for hours now, so it does not recover at all. > > The most worrying thing is that this stream definition does not even use > > state stores, it literally looks like this: > > > > KStreamBuilder builder = new KStreamBuilder(); > >KStream kStream = > > builder.stream(appOptions.getInput().getTopic()); > >kStream.process(() -> processor); > >new KafkaStreams(builder, streamsConfiguration); > > > > The "processor" does its thing and calls "context().commit()" when done. > > That's it. Looking at the actual machine running the instance, the > folders > > under /tmp/kafka-streams// only have a .lock file. > > > > This seems to have been bootstrapped by the exception: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > > completed since the group has already rebalanced and assigned the > > partitions to another member. This means that the time between subsequent > > calls to poll() was longer than the configured max.poll.interval.ms, > which > > typically implies that the poll loop is spending too much time message > > processing. You can address this either by increasing the session timeout > > or by reducing the maximum size of batches returned in poll() with > > max.poll.records. > > > > We are addressing the latter by reducing "max.poll.records" and > increasing " > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry > about > > state dirs if there are no state stores? Since it doesn't seem to do so > > automatically, can I configured it somehow to achieve this end? > > > > Additional
Re: Reliably implementing global KeyValueStore#get
Indeed, all good points. Thanks all for the continuing valuable feedback! > On Jun 7, 2017, at 3:07 PM, Matthias J. Sax wrote: > > If you write to remote DB, keep in mind that this will impact you > Streams app, as you loose data locality. > > Thus, populating a DB from the changelog might be better. It also > decouples both systems what give you the advantage that your Streams app > can still run if DB has an issues. If you write directly into DB and DB > is not available Streams App is doomed to fail too. > > > -Matthias > > > On 6/7/17 2:54 PM, Jan Filipiak wrote: >> Depends, embedded postgress puts you into the same spot. >> >> But if you use your state store change log to materialize into a >> postgress; that might work out decently. >> Current JDBC doesn't support delete which is an issue but writing a >> custom sink is not to hard. >> >> Best Jan >> >> >> On 07.06.2017 23:47, Steven Schlansker wrote: >>> I was actually considering writing my own KeyValueStore backed >>> by e.g. a Postgres or the like. >>> >>> Is there some feature Connect gains me that would make it better >>> than such an approach? >>> >>> thanks >>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak wrote: Hi, have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements? Best Jan On 07.06.2017 00:29, Steven Schlansker wrote: >> On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: >> >> Steven, >> >> In practice, data shouldn't be migrating that often. If it is then you >> probably have bigger problems. > Understood and agreed, but when designing distributed systems, it > usually > helps to model for the worst case rather than the "well that should > never > happen" case, lest you find yourself fixing those bugs at 3am > instead :) > > I'd like to be able to induce extreme pain at the Kafka layer > (change leader > every 3 seconds and migrate all partitions around randomly) and > still have > my app behave correctly. > >> You should be able to use the metadata api >> to find the instance the key should be on and then when you check >> that node >> you can also check with the metadata api that the key should still >> be on >> this host. If streams is rebalancing while you query an exception >> will be >> raised and you'll need to retry the request once the rebalance has >> completed. > Agreed here as well. But let's assume I have a very fast replication > setup (assume it takes zero time, for the sake of argument) -- I'm > fairly > sure there's still a race here as this exception only fires *during > a migration* > not *after a migration that may have invalidated your metadata > lookup completes* > >> HTH, >> Damian >> >> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker >> >> wrote: >> On Jun 6, 2017, at 6:16 AM, Eno Thereska wrote: Hi Steven, Do you know beforehand if a key exists? If you know that and are getting >>> null() the code will have to retry by refreshing the metadata and >>> going to >>> the new instance. If you don’t know beforehand if a key exists or >>> not you >>> might have to check all instances of a store to make sure. >>> No, I am not presupposing that the key can exist -- this is a user >>> visible >>> API and will >>> be prone to "accidents" :) >>> >>> Thanks for the insight. I worry that even checking all stores is not >>> truly sufficient, >>> as querying different all workers at different times in the >>> presence of >>> migrating data >>> can still in theory miss it given pessimal execution. >>> >>> I'm sure I've long wandered off into the hypothetical, but I dream >>> of some >>> day being >>> cool like Jepsen :) >>> Eno > On Jun 5, 2017, at 10:12 PM, Steven Schlansker < >>> sschlans...@opentable.com> wrote: > Hi everyone, me again :) > > I'm still trying to implement my "remoting" layer that allows > my clients to see the partitioned Kafka Streams state > regardless of which instance they hit. Roughly, my lookup is: > > Message get(Key key) { > RemoteInstance instance = selectPartition(key); > return instance.get(key); // http remoting > } > > RemoteInstance.get(Key key) { // http endpoint > return readOnlyKeyValueStore.get(key); > } > > However, the mapping of partitions to instances may change. > If you call KeyValueStore.get(K) where K is on a partition you > don't own, it returns null. This is indistinguishable from a > successful get on a key that doesn
Re: Reliably implementing global KeyValueStore#get
If you write to remote DB, keep in mind that this will impact you Streams app, as you loose data locality. Thus, populating a DB from the changelog might be better. It also decouples both systems what give you the advantage that your Streams app can still run if DB has an issues. If you write directly into DB and DB is not available Streams App is doomed to fail too. -Matthias On 6/7/17 2:54 PM, Jan Filipiak wrote: > Depends, embedded postgress puts you into the same spot. > > But if you use your state store change log to materialize into a > postgress; that might work out decently. > Current JDBC doesn't support delete which is an issue but writing a > custom sink is not to hard. > > Best Jan > > > On 07.06.2017 23:47, Steven Schlansker wrote: >> I was actually considering writing my own KeyValueStore backed >> by e.g. a Postgres or the like. >> >> Is there some feature Connect gains me that would make it better >> than such an approach? >> >> thanks >> >>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak >>> wrote: >>> >>> Hi, >>> >>> have you thought about using connect to put data into a store that is >>> more reasonable for your kind of query requirements? >>> >>> Best Jan >>> >>> On 07.06.2017 00:29, Steven Schlansker wrote: > On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: > > Steven, > > In practice, data shouldn't be migrating that often. If it is then you > probably have bigger problems. Understood and agreed, but when designing distributed systems, it usually helps to model for the worst case rather than the "well that should never happen" case, lest you find yourself fixing those bugs at 3am instead :) I'd like to be able to induce extreme pain at the Kafka layer (change leader every 3 seconds and migrate all partitions around randomly) and still have my app behave correctly. > You should be able to use the metadata api > to find the instance the key should be on and then when you check > that node > you can also check with the metadata api that the key should still > be on > this host. If streams is rebalancing while you query an exception > will be > raised and you'll need to retry the request once the rebalance has > completed. Agreed here as well. But let's assume I have a very fast replication setup (assume it takes zero time, for the sake of argument) -- I'm fairly sure there's still a race here as this exception only fires *during a migration* not *after a migration that may have invalidated your metadata lookup completes* > HTH, > Damian > > On Tue, 6 Jun 2017 at 18:11 Steven Schlansker > > wrote: > >>> On Jun 6, 2017, at 6:16 AM, Eno Thereska >>> wrote: >>> >>> Hi Steven, >>> >>> Do you know beforehand if a key exists? If you know that and are >>> getting >> null() the code will have to retry by refreshing the metadata and >> going to >> the new instance. If you don’t know beforehand if a key exists or >> not you >> might have to check all instances of a store to make sure. >> No, I am not presupposing that the key can exist -- this is a user >> visible >> API and will >> be prone to "accidents" :) >> >> Thanks for the insight. I worry that even checking all stores is not >> truly sufficient, >> as querying different all workers at different times in the >> presence of >> migrating data >> can still in theory miss it given pessimal execution. >> >> I'm sure I've long wandered off into the hypothetical, but I dream >> of some >> day being >> cool like Jepsen :) >> >>> Eno >>> >>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker < >> sschlans...@opentable.com> wrote: Hi everyone, me again :) I'm still trying to implement my "remoting" layer that allows my clients to see the partitioned Kafka Streams state regardless of which instance they hit. Roughly, my lookup is: Message get(Key key) { RemoteInstance instance = selectPartition(key); return instance.get(key); // http remoting } RemoteInstance.get(Key key) { // http endpoint return readOnlyKeyValueStore.get(key); } However, the mapping of partitions to instances may change. If you call KeyValueStore.get(K) where K is on a partition you don't own, it returns null. This is indistinguishable from a successful get on a key that doesn't exist. If one instance selects a sibling instance right as the partition is >> failing off of that instance, it may get routed there and by the time it gets the request no longer "owns" the partition -- returns a false 'null'. >
Re: Reliably implementing global KeyValueStore#get
Depends, embedded postgress puts you into the same spot. But if you use your state store change log to materialize into a postgress; that might work out decently. Current JDBC doesn't support delete which is an issue but writing a custom sink is not to hard. Best Jan On 07.06.2017 23:47, Steven Schlansker wrote: I was actually considering writing my own KeyValueStore backed by e.g. a Postgres or the like. Is there some feature Connect gains me that would make it better than such an approach? thanks On Jun 7, 2017, at 2:20 PM, Jan Filipiak wrote: Hi, have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements? Best Jan On 07.06.2017 00:29, Steven Schlansker wrote: On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: Steven, In practice, data shouldn't be migrating that often. If it is then you probably have bigger problems. Understood and agreed, but when designing distributed systems, it usually helps to model for the worst case rather than the "well that should never happen" case, lest you find yourself fixing those bugs at 3am instead :) I'd like to be able to induce extreme pain at the Kafka layer (change leader every 3 seconds and migrate all partitions around randomly) and still have my app behave correctly. You should be able to use the metadata api to find the instance the key should be on and then when you check that node you can also check with the metadata api that the key should still be on this host. If streams is rebalancing while you query an exception will be raised and you'll need to retry the request once the rebalance has completed. Agreed here as well. But let's assume I have a very fast replication setup (assume it takes zero time, for the sake of argument) -- I'm fairly sure there's still a race here as this exception only fires *during a migration* not *after a migration that may have invalidated your metadata lookup completes* HTH, Damian On Tue, 6 Jun 2017 at 18:11 Steven Schlansker wrote: On Jun 6, 2017, at 6:16 AM, Eno Thereska wrote: Hi Steven, Do you know beforehand if a key exists? If you know that and are getting null() the code will have to retry by refreshing the metadata and going to the new instance. If you don’t know beforehand if a key exists or not you might have to check all instances of a store to make sure. No, I am not presupposing that the key can exist -- this is a user visible API and will be prone to "accidents" :) Thanks for the insight. I worry that even checking all stores is not truly sufficient, as querying different all workers at different times in the presence of migrating data can still in theory miss it given pessimal execution. I'm sure I've long wandered off into the hypothetical, but I dream of some day being cool like Jepsen :) Eno On Jun 5, 2017, at 10:12 PM, Steven Schlansker < sschlans...@opentable.com> wrote: Hi everyone, me again :) I'm still trying to implement my "remoting" layer that allows my clients to see the partitioned Kafka Streams state regardless of which instance they hit. Roughly, my lookup is: Message get(Key key) { RemoteInstance instance = selectPartition(key); return instance.get(key); // http remoting } RemoteInstance.get(Key key) { // http endpoint return readOnlyKeyValueStore.get(key); } However, the mapping of partitions to instances may change. If you call KeyValueStore.get(K) where K is on a partition you don't own, it returns null. This is indistinguishable from a successful get on a key that doesn't exist. If one instance selects a sibling instance right as the partition is failing off of that instance, it may get routed there and by the time it gets the request no longer "owns" the partition -- returns a false 'null'. You can try re-checking after you get a null value, but that's susceptible to the same race -- it's unlikely but possible that the data migrates *back* before you do this re-check. Is there any way to correctly implement this without races? I'd imagine you need a new primitive like KeyValueStore#get that atomically finds the key or throws an exception if it is not in an owned partition at the time of lookup so you know to recheck the partition and retry. Thoughts? Thanks again, Steven
Re: Reliably implementing global KeyValueStore#get
I was actually considering writing my own KeyValueStore backed by e.g. a Postgres or the like. Is there some feature Connect gains me that would make it better than such an approach? thanks > On Jun 7, 2017, at 2:20 PM, Jan Filipiak wrote: > > Hi, > > have you thought about using connect to put data into a store that is more > reasonable for your kind of query requirements? > > Best Jan > > On 07.06.2017 00:29, Steven Schlansker wrote: >>> On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: >>> >>> Steven, >>> >>> In practice, data shouldn't be migrating that often. If it is then you >>> probably have bigger problems. >> Understood and agreed, but when designing distributed systems, it usually >> helps to model for the worst case rather than the "well that should never >> happen" case, lest you find yourself fixing those bugs at 3am instead :) >> >> I'd like to be able to induce extreme pain at the Kafka layer (change leader >> every 3 seconds and migrate all partitions around randomly) and still have >> my app behave correctly. >> >>> You should be able to use the metadata api >>> to find the instance the key should be on and then when you check that node >>> you can also check with the metadata api that the key should still be on >>> this host. If streams is rebalancing while you query an exception will be >>> raised and you'll need to retry the request once the rebalance has >>> completed. >> Agreed here as well. But let's assume I have a very fast replication >> setup (assume it takes zero time, for the sake of argument) -- I'm fairly >> sure there's still a race here as this exception only fires *during a >> migration* >> not *after a migration that may have invalidated your metadata lookup >> completes* >> >>> HTH, >>> Damian >>> >>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker >>> wrote: >>> > On Jun 6, 2017, at 6:16 AM, Eno Thereska wrote: > > Hi Steven, > > Do you know beforehand if a key exists? If you know that and are getting null() the code will have to retry by refreshing the metadata and going to the new instance. If you don’t know beforehand if a key exists or not you might have to check all instances of a store to make sure. No, I am not presupposing that the key can exist -- this is a user visible API and will be prone to "accidents" :) Thanks for the insight. I worry that even checking all stores is not truly sufficient, as querying different all workers at different times in the presence of migrating data can still in theory miss it given pessimal execution. I'm sure I've long wandered off into the hypothetical, but I dream of some day being cool like Jepsen :) > Eno > > >> On Jun 5, 2017, at 10:12 PM, Steven Schlansker < sschlans...@opentable.com> wrote: >> Hi everyone, me again :) >> >> I'm still trying to implement my "remoting" layer that allows >> my clients to see the partitioned Kafka Streams state >> regardless of which instance they hit. Roughly, my lookup is: >> >> Message get(Key key) { >> RemoteInstance instance = selectPartition(key); >> return instance.get(key); // http remoting >> } >> >> RemoteInstance.get(Key key) { // http endpoint >> return readOnlyKeyValueStore.get(key); >> } >> >> However, the mapping of partitions to instances may change. >> If you call KeyValueStore.get(K) where K is on a partition you >> don't own, it returns null. This is indistinguishable from a >> successful get on a key that doesn't exist. >> >> If one instance selects a sibling instance right as the partition is failing >> off of that instance, it may get routed there and by the time it gets >> the request no longer "owns" the partition -- returns a false 'null'. >> >> You can try re-checking after you get a null value, but that's susceptible >> to the same race -- it's unlikely but possible that the data migrates *back* >> before you do this re-check. >> >> Is there any way to correctly implement this without races? I'd imagine >> you need a new primitive like KeyValueStore#get that atomically finds >> the key or throws an exception if it is not in an owned partition >> at the time of lookup so you know to recheck the partition and retry. >> >> Thoughts? >> >> Thanks again, >> Steven >> > signature.asc Description: Message signed with OpenPGP using GPGMail
Re: kafka streams changelog topic value
ConsoleConsumer by default uses String deserializer, but value in the changelog is of type long. For output topic, the type in converted from long to string though -- thus you can read the output topic without problems. For reading the changelog topic, you need to specify option --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer (hope I got the option right :)) -Matthias On 6/7/17 6:40 AM, john cheng wrote: > the right way to see changelog persistent by rocksdb is use ByteDeser, and > then decode hex to string > > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > > for(ConsumerRecord record: consumerRecords) { > > print bytesToHexString(record.value()) > > } > > public static String bytesToHexString(byte[] src){ > StringBuilder stringBuilder = new StringBuilder(""); > if (src == null || src.length <= 0) { > return null; > } > for (int i = 0; i < src.length; i++) { > int v = src[i] & 0xFF; > String hv = Integer.toHexString(v); > if (hv.length() < 2) { > stringBuilder.append(0); > } > stringBuilder.append(hv); > } > return stringBuilder.toString(); > } > > output now collect > > p:0,o:0,k:msg1,v:0001 > p:0,o:1,k:msg3,v:0001 > p:0,o:2,k:msg5,v:0001 > p:0,o:3,k:msg1,v:0002 > p:0,o:4,k:msg3,v:0002 > p:1,o:0,k:msg2,v:0001 > p:1,o:1,k:msg4,v:0001 > p:1,o:2,k:msg2,v:0002 > p:1,o:3,k:msg2,v:0003 > > > 2017-06-07 18:42 GMT+08:00 john cheng : > >> I add some log on StoreChangeLog >> >> for (K k : this.dirty) { >> V v = getter.get(k); >> log.info("logChange key:{},value:{}", k, v); >> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), >> keySerializer, valueSerializer); >> } >> >> and found the print result is normal, just some byte: >> >> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] >> (org.apache.kafka.streams.state.internals.StoreChangeLogger) >> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] >> (org.apache.kafka.streams.state.internals.StoreChangeLogger) >> >> but console-consumer still output kafka: >> >> so this may be problem of console-consume. >> >> >> 2017-06-07 18:24 GMT+08:00 john cheng : >> >>> I'm running WordCountProcessorDemo with Processor API. and change >>> something below >>> 1. config 1 stream-thread and 1 replicas >>> 2. change inMemory() to persistent() >>> MyKakfa version is 0.10.0.0. After running streaming application, I check >>> msg output by console-consumer >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list >>> localhost:9092 --topic streams-input2 >>> msg1 # by console-producer, we can only produce message's value. so >>> message produce to input topic will use roundrobbin partition >>> msg2 >>> msg3 >>> msg4 >>> msg5 >>> msg6 >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-input2 --property >>> print.key=true --property key.separator=":" --from-beginning >>> null:msg2 # key is null, value is what we produce above >>> null:msg4 >>> null:msg6 >>> null:msg1 >>> null:msg3 >>> null:msg5 >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-output2 --property >>> print.key=true --property key.separator=":" --from-beginning >>> msg2:1 >>> msg1:1 >>> msg1:1 >>> msg3:1 >>> msg2:1 >>> msg4:1 >>> msg1:1 >>> msg3:1 >>> msg5:1 >>> msg2:1 >>> msg4:1 >>> msg6:1 # due to log compaction, same key will be overwrite. this is ok... >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog >>> --property print.key=true --property key.separator=":" --from-beginning >>> *msg2:* >>> *msg4:* >>> *msg6:* >>> *msg1:* >>> *msg3:* >>> *msg5:* >>> Everything is ok, except changelog-topic trouble me. Why it's value is >>> empty? >>> >>> I have dig into source code, and summary the workflow : >>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, >>> no matter Memory or RocksDB, they both put into local storage, then append >>> msg to StoreChangeLogger >>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange >>> by passing ValueGetter, this getter will get value from local storage when >>> logChange() operation happen >>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to >>> changelog topic, here is streams-wordcount-Counts-changelog >>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange() >>> and add(). >>> >>> for (K k : this.dirty) { // logChange() method flush dirty to changelog >>> topic >>> V v = getter.get(k); // value getter will get value from local storage >>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v
Re: Reliably implementing global KeyValueStore#get
Hi, have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements? Best Jan On 07.06.2017 00:29, Steven Schlansker wrote: On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: Steven, In practice, data shouldn't be migrating that often. If it is then you probably have bigger problems. Understood and agreed, but when designing distributed systems, it usually helps to model for the worst case rather than the "well that should never happen" case, lest you find yourself fixing those bugs at 3am instead :) I'd like to be able to induce extreme pain at the Kafka layer (change leader every 3 seconds and migrate all partitions around randomly) and still have my app behave correctly. You should be able to use the metadata api to find the instance the key should be on and then when you check that node you can also check with the metadata api that the key should still be on this host. If streams is rebalancing while you query an exception will be raised and you'll need to retry the request once the rebalance has completed. Agreed here as well. But let's assume I have a very fast replication setup (assume it takes zero time, for the sake of argument) -- I'm fairly sure there's still a race here as this exception only fires *during a migration* not *after a migration that may have invalidated your metadata lookup completes* HTH, Damian On Tue, 6 Jun 2017 at 18:11 Steven Schlansker wrote: On Jun 6, 2017, at 6:16 AM, Eno Thereska wrote: Hi Steven, Do you know beforehand if a key exists? If you know that and are getting null() the code will have to retry by refreshing the metadata and going to the new instance. If you don’t know beforehand if a key exists or not you might have to check all instances of a store to make sure. No, I am not presupposing that the key can exist -- this is a user visible API and will be prone to "accidents" :) Thanks for the insight. I worry that even checking all stores is not truly sufficient, as querying different all workers at different times in the presence of migrating data can still in theory miss it given pessimal execution. I'm sure I've long wandered off into the hypothetical, but I dream of some day being cool like Jepsen :) Eno On Jun 5, 2017, at 10:12 PM, Steven Schlansker < sschlans...@opentable.com> wrote: Hi everyone, me again :) I'm still trying to implement my "remoting" layer that allows my clients to see the partitioned Kafka Streams state regardless of which instance they hit. Roughly, my lookup is: Message get(Key key) { RemoteInstance instance = selectPartition(key); return instance.get(key); // http remoting } RemoteInstance.get(Key key) { // http endpoint return readOnlyKeyValueStore.get(key); } However, the mapping of partitions to instances may change. If you call KeyValueStore.get(K) where K is on a partition you don't own, it returns null. This is indistinguishable from a successful get on a key that doesn't exist. If one instance selects a sibling instance right as the partition is failing off of that instance, it may get routed there and by the time it gets the request no longer "owns" the partition -- returns a false 'null'. You can try re-checking after you get a null value, but that's susceptible to the same race -- it's unlikely but possible that the data migrates *back* before you do this re-check. Is there any way to correctly implement this without races? I'd imagine you need a new primitive like KeyValueStore#get that atomically finds the key or throws an exception if it is not in an owned partition at the time of lookup so you know to recheck the partition and retry. Thoughts? Thanks again, Steven
Re: Kafka Streams Failed to rebalance error
Hi there, This might be a bug, would you mind opening a JIRA (copy-pasting below is sufficient). Thanks Eno > On 7 Jun 2017, at 21:38, João Peixoto wrote: > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : Could not create task 0_31. Will > retry. > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock > the state directory for task 0_31 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > [kafka-streams-0.10.2.1.jar!/:na] > > > It has been printing it for hours now, so it does not recover at all. > The most worrying thing is that this stream definition does not even use > state stores, it literally looks like this: > > KStreamBuilder builder = new KStreamBuilder(); >KStream kStream = > builder.stream(appOptions.getInput().getTopic()); >kStream.process(() -> processor); >new KafkaStreams(builder, streamsConfiguration); > > The "processor" does its thing and calls "context().commit()" when done. > That's it. Looking at the actual machine running the instance, the folders > under /tmp/kafka-streams// only have a .lock file. > > This seems to have been bootstrapped by the exception: > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time message > processing. You can address this either by increasing the session timeout > or by reducing the maximum size of batches returned in poll() with > max.poll.records. > > We are addressing the latter by reducing "max.poll.records" and increasing " > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about > state dirs if there are no state stores? Since it doesn't seem to do so > automatically, can I configured it somehow to achieve this end? > > Additionally, what could lead to it not being able to recover? > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax > wrote: > >> Great! :) >> >> On 5/16/17 2:31 AM, Sameer Kumar wrote: >>> I see now that my Kafka cluster is very stable, and these errors dont >> come >>> now. >>> >>> -Sameer. >>> >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar >> wrote: >>> Yes, I have upgraded my cluster and client both to version 10.2.1 and currently monitoring the situation. Will report back in case I find any errors. Thanks for the help though. -Sameer. On Fri,
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Hi Eno, On 07.06.2017 22:49, Eno Thereska wrote: Comments inline: On 5 Jun 2017, at 18:19, Jan Filipiak wrote: Hi just my few thoughts On 05.06.2017 11:44, Eno Thereska wrote: Hi there, Sorry for the late reply, I was out this past week. Looks like good progress was made with the discussions either way. Let me recap a couple of points I saw into one big reply: 1. Jan mentioned CRC errors. I think this is a good point. As these happen in Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear the opinion of more Kafka folks like Ismael or Jason on this one. Currently the documentation is not great with what to do once a CRC check has failed. From looking at the code, it looks like the client gets a KafkaException (bubbled up from the fetcher) and currently we in streams catch this as part of poll() and fail. It might be advantageous to treat CRC handling in a similar way to serialisation handling (e.g., have the option to fail/skip). Let's see what the other folks say. Worst-case we can do a separate KIP for that if it proved too hard to do in one go. there is no reasonable way to "skip" a crc error. How can you know the length you read was anything reasonable? you might be completely lost inside your response. On the client side, every record received is checked for validity. As it happens, if the CRC check fails the exception is wrapped with a KafkaException that is thrown all the way to poll(). Assuming we change that and poll() throws a CRC exception, I was thinking we could treat it similarly to a deserialize exception and pass it to the exception handler to decide what to do. Default would be to fail. This might need a Kafka KIP btw and can be done separately from this KIP, but Jan, would you find this useful? I don't think so. IMO you can not reasonably continue parsing when the checksum of a message is not correct. If you are not sure you got the correct length, how can you be sure to find the next record? I would always straight fail in all cases. Its to hard for me to understand why one would try to continue. I mentioned CRC's because thats the only bad pills I ever saw so far. But I am happy that it just stopped and I could check what was going on. This will also be invasive in the client code then. If you ask me, I am always going to vote for "grind to halt" let the developers see what happened and let them fix it. It helps building good kafka experiences and better software and architectures. For me this is: "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip by. Letting unexpected input slip by is what bought us 15+years of war of all sorts of ingestion attacks. I don't even dare to estimate how many missingrecords-search-teams going be formed, maybe some hackerone for stream apps :D Best Jan At a minimum, handling this type of exception will need to involve the exactly-once (EoS) logic. We'd still allow the option of failing or skipping, but EoS would need to clean up by rolling back all the side effects from the processing so far. Matthias, how does this sound? Eos will not help the record might be 5,6 repartitions down into the topology. I haven't followed but I pray you made EoS optional! We don't need this and we don't want this and we will turn it off if it comes. So I wouldn't recommend relying on it. The option to turn it off is better than forcing it and still beeing unable to rollback badpills (as explained before) Yeah as Matthias mentioned EoS is optional. Thanks, Eno 6. Will add an end-to-end example as Michael suggested. Thanks Eno On 4 Jun 2017, at 02:35, Matthias J. Sax wrote: What I don't understand is this: From there on its the easiest way forward: fix, redeploy, start => done If you have many producers that work fine and a new "bad" producer starts up and writes bad data into your input topic, your Streams app dies but all your producers, including the bad one, keep writing. Thus, how would you fix this, as you cannot "remove" the corrupted date from the topic? It might take some time to identify the root cause and stop the bad producer. Up to this point you get good and bad data into your Streams input topic. If Streams app in not able to skip over those bad records, how would you get all the good data from the topic? Not saying it's not possible, but it's extra work copying the data with a new non-Streams consumer-producer-app into a new topic and than feed your Streams app from this new topic -- you also need to update all your upstream producers to write to the new topic. Thus, if you want to fail fast, you can still do this. And after you detected and fixed the bad producer you might just reconfigure your app to skip bad records until it reaches the good part of the data. Afterwards, you could redeploy with fail-fast again. Thus, for this pattern, I actually don't see any reason why to stop the Streams ap
Re: Reliably implementing global KeyValueStore#get
Hi Steven, You are right in principle. The thing is that what we shipped in Kafka is just the low level bare bones that in a sense belong to Kafka. A middle layer that keeps track of the data is absolutely needed, and it should hopefully hide the distributed system challenges from the end user. Now the question is how should such layer look like. I think in all systems there are some basic assumptions made about frequency of failures and rebalances just to keep the number of retries sane. I agree with you that in principle a rebalance could be always happening though. Eno > On 6 Jun 2017, at 23:29, Steven Schlansker wrote: > > >> On Jun 6, 2017, at 2:52 PM, Damian Guy wrote: >> >> Steven, >> >> In practice, data shouldn't be migrating that often. If it is then you >> probably have bigger problems. > > Understood and agreed, but when designing distributed systems, it usually > helps to model for the worst case rather than the "well that should never > happen" case, lest you find yourself fixing those bugs at 3am instead :) > > I'd like to be able to induce extreme pain at the Kafka layer (change leader > every 3 seconds and migrate all partitions around randomly) and still have > my app behave correctly. > >> You should be able to use the metadata api >> to find the instance the key should be on and then when you check that node >> you can also check with the metadata api that the key should still be on >> this host. If streams is rebalancing while you query an exception will be >> raised and you'll need to retry the request once the rebalance has >> completed. > > Agreed here as well. But let's assume I have a very fast replication > setup (assume it takes zero time, for the sake of argument) -- I'm fairly > sure there's still a race here as this exception only fires *during a > migration* > not *after a migration that may have invalidated your metadata lookup > completes* > >> >> HTH, >> Damian >> >> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker >> wrote: >> >>> On Jun 6, 2017, at 6:16 AM, Eno Thereska wrote: Hi Steven, Do you know beforehand if a key exists? If you know that and are getting >>> null() the code will have to retry by refreshing the metadata and going to >>> the new instance. If you don’t know beforehand if a key exists or not you >>> might have to check all instances of a store to make sure. >>> >>> No, I am not presupposing that the key can exist -- this is a user visible >>> API and will >>> be prone to "accidents" :) >>> >>> Thanks for the insight. I worry that even checking all stores is not >>> truly sufficient, >>> as querying different all workers at different times in the presence of >>> migrating data >>> can still in theory miss it given pessimal execution. >>> >>> I'm sure I've long wandered off into the hypothetical, but I dream of some >>> day being >>> cool like Jepsen :) >>> Eno > On Jun 5, 2017, at 10:12 PM, Steven Schlansker < >>> sschlans...@opentable.com> wrote: > > Hi everyone, me again :) > > I'm still trying to implement my "remoting" layer that allows > my clients to see the partitioned Kafka Streams state > regardless of which instance they hit. Roughly, my lookup is: > > Message get(Key key) { > RemoteInstance instance = selectPartition(key); > return instance.get(key); // http remoting > } > > RemoteInstance.get(Key key) { // http endpoint > return readOnlyKeyValueStore.get(key); > } > > However, the mapping of partitions to instances may change. > If you call KeyValueStore.get(K) where K is on a partition you > don't own, it returns null. This is indistinguishable from a > successful get on a key that doesn't exist. > > If one instance selects a sibling instance right as the partition is >>> failing > off of that instance, it may get routed there and by the time it gets > the request no longer "owns" the partition -- returns a false 'null'. > > You can try re-checking after you get a null value, but that's >>> susceptible > to the same race -- it's unlikely but possible that the data migrates >>> *back* > before you do this re-check. > > Is there any way to correctly implement this without races? I'd imagine > you need a new primitive like KeyValueStore#get that atomically finds > the key or throws an exception if it is not in an owned partition > at the time of lookup so you know to recheck the partition and retry. > > Thoughts? > > Thanks again, > Steven > >>> >>> >
Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Comments inline: > On 5 Jun 2017, at 18:19, Jan Filipiak wrote: > > Hi > > just my few thoughts > > On 05.06.2017 11:44, Eno Thereska wrote: >> Hi there, >> >> Sorry for the late reply, I was out this past week. Looks like good progress >> was made with the discussions either way. Let me recap a couple of points I >> saw into one big reply: >> >> 1. Jan mentioned CRC errors. I think this is a good point. As these happen >> in Kafka, before Kafka Streams gets a chance to inspect anything, I'd like >> to hear the opinion of more Kafka folks like Ismael or Jason on this one. >> Currently the documentation is not great with what to do once a CRC check >> has failed. From looking at the code, it looks like the client gets a >> KafkaException (bubbled up from the fetcher) and currently we in streams >> catch this as part of poll() and fail. It might be advantageous to treat CRC >> handling in a similar way to serialisation handling (e.g., have the option >> to fail/skip). Let's see what the other folks say. Worst-case we can do a >> separate KIP for that if it proved too hard to do in one go. > there is no reasonable way to "skip" a crc error. How can you know the length > you read was anything reasonable? you might be completely lost inside your > response. On the client side, every record received is checked for validity. As it happens, if the CRC check fails the exception is wrapped with a KafkaException that is thrown all the way to poll(). Assuming we change that and poll() throws a CRC exception, I was thinking we could treat it similarly to a deserialize exception and pass it to the exception handler to decide what to do. Default would be to fail. This might need a Kafka KIP btw and can be done separately from this KIP, but Jan, would you find this useful? >> >> >> At a minimum, handling this type of exception will need to involve the >> exactly-once (EoS) logic. We'd still allow the option of failing or >> skipping, but EoS would need to clean up by rolling back all the side >> effects from the processing so far. Matthias, how does this sound? > Eos will not help the record might be 5,6 repartitions down into the > topology. I haven't followed but I pray you made EoS optional! We don't need > this and we don't want this and we will turn it off if it comes. So I > wouldn't recommend relying on it. The option to turn it off is better than > forcing it and still beeing unable to rollback badpills (as explained before) >> Yeah as Matthias mentioned EoS is optional. Thanks, Eno >> 6. Will add an end-to-end example as Michael suggested. >> >> Thanks >> Eno >> >> >> >>> On 4 Jun 2017, at 02:35, Matthias J. Sax wrote: >>> >>> What I don't understand is this: >>> From there on its the easiest way forward: fix, redeploy, start => done >>> If you have many producers that work fine and a new "bad" producer >>> starts up and writes bad data into your input topic, your Streams app >>> dies but all your producers, including the bad one, keep writing. >>> >>> Thus, how would you fix this, as you cannot "remove" the corrupted date >>> from the topic? It might take some time to identify the root cause and >>> stop the bad producer. Up to this point you get good and bad data into >>> your Streams input topic. If Streams app in not able to skip over those >>> bad records, how would you get all the good data from the topic? Not >>> saying it's not possible, but it's extra work copying the data with a >>> new non-Streams consumer-producer-app into a new topic and than feed >>> your Streams app from this new topic -- you also need to update all your >>> upstream producers to write to the new topic. >>> >>> Thus, if you want to fail fast, you can still do this. And after you >>> detected and fixed the bad producer you might just reconfigure your app >>> to skip bad records until it reaches the good part of the data. >>> Afterwards, you could redeploy with fail-fast again. >>> >>> >>> Thus, for this pattern, I actually don't see any reason why to stop the >>> Streams app at all. If you have a callback, and use the callback to >>> raise an alert (and maybe get the bad data into a bad record queue), it >>> will not take longer to identify and stop the "bad" producer. But for >>> this case, you have zero downtime for your Streams app. >>> >>> This seems to be much simpler. Or do I miss anything? >>> >>> >>> Having said this, I agree that the "threshold based callback" might be >>> questionable. But as you argue for strict "fail-fast", I want to argue >>> that this must not always be the best pattern to apply and that the >>> overall KIP idea is super useful from my point of view. >>> >>> >>> -Matthias >>> >>> >>> On 6/3/17 11:57 AM, Jan Filipiak wrote: Could not agree more! But then I think the easiest is still: print exception and die. From there on its the easiest way forward: fix, redeploy, start => done All the other ways to recover
Re: Kafka Streams Failed to rebalance error
I'm using Kafka Streams 0.10.2.1 and I still see this error 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : Could not create task 0_31. Will retry. org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock the state directory for task 0_31 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [kafka-streams-0.10.2.1.jar!/:na] It has been printing it for hours now, so it does not recover at all. The most worrying thing is that this stream definition does not even use state stores, it literally looks like this: KStreamBuilder builder = new KStreamBuilder(); KStream kStream = builder.stream(appOptions.getInput().getTopic()); kStream.process(() -> processor); new KafkaStreams(builder, streamsConfiguration); The "processor" does its thing and calls "context().commit()" when done. That's it. Looking at the actual machine running the instance, the folders under /tmp/kafka-streams// only have a .lock file. This seems to have been bootstrapped by the exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. We are addressing the latter by reducing "max.poll.records" and increasing " commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about state dirs if there are no state stores? Since it doesn't seem to do so automatically, can I configured it somehow to achieve this end? Additionally, what could lead to it not being able to recover? On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax wrote: > Great! :) > > On 5/16/17 2:31 AM, Sameer Kumar wrote: > > I see now that my Kafka cluster is very stable, and these errors dont > come > > now. > > > > -Sameer. > > > > On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar > wrote: > > > >> Yes, I have upgraded my cluster and client both to version 10.2.1 and > >> currently monitoring the situation. > >> Will report back in case I find any errors. Thanks for the help though. > >> > >> -Sameer. > >> > >> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax > >> wrote: > >> > >>> Did you see Eno's reply? > >>> > >>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not, > >>> please report back. > >>> > >>> I would also recommend to subscribe to the list. It's self-service > >>> http://kafka.apache.org/contact > >>> > >>> > >>> -Matthias > >>> > >>> On 5/3
Re: Async Non Blocking Kafka Producer
Thanks hans. It would work but producer will start loosing the data even the Cluster is available. Thanks Ankit Jain On Wed, Jun 7, 2017 at 12:56 PM, Hans Jespersen wrote: > Try adding props.put("max.block.ms", "0"); > > -hans > > > > > On Jun 7, 2017, at 12:24 PM, Ankit Jain wrote: > > > > Hi, > > > > We want to use the non blocking Kafka producer. The producer thread > should > > not block if the Kafka is cluster is down or not reachable. > > > > Currently, we are setting following properties but the Producer thread is > > still blocking if the Kafka cluster goes gown or unreachable. > > > > * props.put("block.on.buffer.full", "false");* > > * props.put("acks", "0");* > > > > -- > > Thanks > > -- Thanks, Ankit Jain
Re: Async Non Blocking Kafka Producer
Try adding props.put("max.block.ms", "0"); -hans > On Jun 7, 2017, at 12:24 PM, Ankit Jain wrote: > > Hi, > > We want to use the non blocking Kafka producer. The producer thread should > not block if the Kafka is cluster is down or not reachable. > > Currently, we are setting following properties but the Producer thread is > still blocking if the Kafka cluster goes gown or unreachable. > > * props.put("block.on.buffer.full", "false");* > * props.put("acks", "0");* > > -- > Thanks
FATAL [mirrormaker-thread-1] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread)
Can someone please help me with this error, this happening after upgrade from 0.8.2 to 0.10.2.1. It seem an issue with my consumers but I cannot determine what is happening. INFO Kafka commitId : e89bffd6b2eff799 (org.apache.kafka.common.utils.AppInfoParser) [2017-06-07 12:24:45,497] INFO [mirrormaker-thread-0] Starting mirror maker thread mirrormaker-thread-0 (kafka.tools.MirrorMaker$MirrorMakerThread) [2017-06-07 12:24:45,497] INFO [mirrormaker-thread-1] Starting mirror maker thread mirrormaker-thread-1 (kafka.tools.MirrorMaker$MirrorMakerThread) [2017-06-07 12:24:48,619] INFO Discovered coordinator app458.sjc2.mytest.com:9092 (id: 2147483613 rack: null) for group MirrorMaker_hkg1. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2017-06-07 12:24:48,620] INFO Discovered coordinator app458.sjc2.mytest.com:9092 (id: 2147483613 rack: null) for group MirrorMaker_hkg1. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2017-06-07 12:24:48,625] INFO Revoking previously assigned partitions [] for group MirrorMaker_hkg1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2017-06-07 12:24:48,625] INFO Revoking previously assigned partitions [] for group MirrorMaker_hkg1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2017-06-07 12:24:48,648] INFO (Re-)joining group MirrorMaker_hkg1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2017-06-07 12:24:48,649] INFO (Re-)joining group MirrorMaker_hkg1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2017-06-07 12:24:53,560] FATAL [mirrormaker-thread-1] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:521) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:625) at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:431)
Async Non Blocking Kafka Producer
Hi, We want to use the non blocking Kafka producer. The producer thread should not block if the Kafka is cluster is down or not reachable. Currently, we are setting following properties but the Producer thread is still blocking if the Kafka cluster goes gown or unreachable. * props.put("block.on.buffer.full", "false");* * props.put("acks", "0");* -- Thanks
Re: Kafka Over TLS Error - Failed to send SSL Close message - Broken Pipe
Hi All , Thanks a lot for your help . A bug has been logged for said issue and can be found at , https://issues.apache.org/jira/browse/KAFKA-5401 Thanks again . On Sun, Jun 4, 2017 at 6:38 PM, Martin Gainty wrote: > > > From: IT Consultant <0binarybudd...@gmail.com> > Sent: Friday, June 2, 2017 11:02 AM > To: users@kafka.apache.org > Subject: Kafka Over TLS Error - Failed to send SSL Close message - Broken > Pipe > > Hi All, > > I have been seeing below error since three days , > > Can you please help me understand more about this , > > > WARN Failed to send SSL Close message > (org.apache.kafka.common.network.SslTransportLayer) > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer. > flush(SslTransportLayer.java:194) > > MG>Here is org.apache.kafka.common.network.SslTransportLayer code: > /** > * Flushes the buffer to the network, non blocking > * @param buf ByteBuffer > * @return boolean true if the buffer has been emptied out, false > otherwise > * @throws IOException > */ > private boolean flush(ByteBuffer buf) throws IOException { > int remaining = buf.remaining(); > if (remaining > 0) { > int written = socketChannel.write(buf); //no check for > isOpen() *socketChannel.isOpen()* > return written >= remaining; > } > return true; > } > > MG>it appears upstream monitor *container* closed connection but kafka > socketChannel never tested (now-closed) connection with isOpen() > MG>i think you found a bug > MG>can you file bug in kafka-jira ? > https://issues.apache.org/jira/browse/KAFKA/?selectedTab=com.atlassian. > jira.jira-projects-plugin:summary-panel > Kafka - ASF JIRA - issues.apache.org issues.apache.org/jira/browse/KAFKA/?selectedTab=com. > atlassian.jira.jira-projects-plugin:summary-panel> > issues.apache.org > Atlassian JIRA Project Management Software (v6.3.15#6346-sha1:dbc023d) > About JIRA; Report a problem; Powered by a free Atlassian JIRA open source > license for Apache ... > > > > > at > org.apache.kafka.common.network.SslTransportLayer. > close(SslTransportLayer.java:148) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45) > at > org.apache.kafka.common.network.Selector.close(Selector.java:442) > at org.apache.kafka.common.network.Selector.poll( > Selector.java:310) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > > > Thanks a lot. >
Re: Finding StreamsMetadata with value-dependent partitioning
Thank you for the idea, I'll keep that in mind if I run into limitations of my current approach. > On Jun 6, 2017, at 5:50 PM, Guozhang Wang wrote: > > Thanks Steven, interesting use case. > > The current streams state store metadata discovery is assuming the > `DefaultStreamPartitioner` is used, which is a limitation for such cases. > > Another workaround that I can think of is, that you can first partition on > D in the first stage to let the workers to the "real" work, then you can > pipe it to a second stage where you re-partition on K, and the second > processor is only for materializing the store for querying. I'm not sure if > it would be better since it may require doubling the store spaces (one on > the first processor and one on the second), and since you can hold the > whole K -> D map in a global state it seems this map is small enough so > maybe not worth the repartitioning. > > > Guozhang > > > > > > > On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll wrote: > >> Happy to hear you found a working solution, Steven! >> >> -Michael >> >> >> >> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker < >> sschlans...@opentable.com> wrote: >> On Jun 2, 2017, at 3:32 PM, Matthias J. Sax >>> wrote: Thanks. That helps to understand the use case better. Rephrase to make sure I understood it correctly: 1) you are providing a custom partitioner to Streams that is base on >> one field in your value (that's fine with regard to fault-tolerance :)) 2) you want to use interactive queries to query the store 3) because of your custom partitioning schema, you need to manually figure out the right application instance that hosts a key 4) thus, you use a GlobalKTable to maintain the information from K to D and thus to the partition ie, streams instance that hosts K If this is correct, than you cannot use the "by key" metadata >> interface. It's designed to find the streams instance base in the key only -- but your partitioner is based on the value. Internally, we call > final Integer partition = partitioner.partition(key, null, >>> sourceTopicsInfo.maxPartitions); Note, that `value==null` -- at this point, we don't have any value available and can't provide it to the partitioner. Thus, your approach to get all metadata is the only way you can go. >>> >>> Thanks for confirming this. The code is a little ugly but I've done >> worse >>> :) >>> Very interesting (and quite special) use case. :) -Matthias On 6/2/17 2:32 PM, Steven Schlansker wrote: > >> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax >>> wrote: >> >> I am not sure if I understand the use case correctly. Could you give >> some more context? > > Happily, thanks for thinking about this! > >> >>> backing store whose partitioning is value dependent >> >> In infer that you are using a custom store and not default RocksDB? >> If >> yes, what do you use? What does "value dependent" mean in this >> context? > > We're currently using the base in memory store. We tried to use >> RocksDB > but the tuning to get it running appropriately in a Linux container >>> without > tripping the cgroups OOM killer is nontrivial. > > >> Right now, I am wondering, why you not just set a new key to get your >> data grouped by the field you are interesting in? Also, if you don't >> partitioned your data by key, you might break your streams >> application >> with regard to fault-tolerance -- or does your custom store not rely >> on >> changelog backup for fault-tolerance? >> > > That's an interesting point about making transformed key. But I don't >>> think > it simplifies my problem too much. Essentially, I have a list of >>> messages > that should get delivered to destinations. Each message has a primary >>> key K > and a destination D. > > We partition over D so that all messages to the same destination are >>> handled by > the same worker, to preserve ordering and implement local rate limits >>> etc. > > I want to preserve the illusion to the client that they can look up a >>> key with > only K. So, as an intermediate step, we use the GlobalKTable to look >>> up D. Once > we have K,D we can then compute the partition and execute a lookup. > > Transforming the key to be a composite K,D isn't helpful because the >>> end user still > only knows K -- D's relevance is an implementation detail I wish to >>> hide -- so you still > need some sort of secondary lookup. > > We do use the changelog backup for fault tolerance -- how would having >>> the partition > based on the value break this? Is the changelog implicitly >> partitioned >>> by a partitioner > other than the one we give to the topology? >
Re: kafka streams changelog topic value
the right way to see changelog persistent by rocksdb is use ByteDeser, and then decode hex to string props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); for(ConsumerRecord record: consumerRecords) { print bytesToHexString(record.value()) } public static String bytesToHexString(byte[] src){ StringBuilder stringBuilder = new StringBuilder(""); if (src == null || src.length <= 0) { return null; } for (int i = 0; i < src.length; i++) { int v = src[i] & 0xFF; String hv = Integer.toHexString(v); if (hv.length() < 2) { stringBuilder.append(0); } stringBuilder.append(hv); } return stringBuilder.toString(); } output now collect p:0,o:0,k:msg1,v:0001 p:0,o:1,k:msg3,v:0001 p:0,o:2,k:msg5,v:0001 p:0,o:3,k:msg1,v:0002 p:0,o:4,k:msg3,v:0002 p:1,o:0,k:msg2,v:0001 p:1,o:1,k:msg4,v:0001 p:1,o:2,k:msg2,v:0002 p:1,o:3,k:msg2,v:0003 2017-06-07 18:42 GMT+08:00 john cheng : > I add some log on StoreChangeLog > > for (K k : this.dirty) { > V v = getter.get(k); > log.info("logChange key:{},value:{}", k, v); > collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), > keySerializer, valueSerializer); > } > > and found the print result is normal, just some byte: > > [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] > (org.apache.kafka.streams.state.internals.StoreChangeLogger) > [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] > (org.apache.kafka.streams.state.internals.StoreChangeLogger) > > but console-consumer still output kafka: > > so this may be problem of console-consume. > > > 2017-06-07 18:24 GMT+08:00 john cheng : > >> I'm running WordCountProcessorDemo with Processor API. and change >> something below >> 1. config 1 stream-thread and 1 replicas >> 2. change inMemory() to persistent() >> MyKakfa version is 0.10.0.0. After running streaming application, I check >> msg output by console-consumer >> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list >> localhost:9092 --topic streams-input2 >> msg1 # by console-producer, we can only produce message's value. so >> message produce to input topic will use roundrobbin partition >> msg2 >> msg3 >> msg4 >> msg5 >> msg6 >> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >> --bootstrap-server localhost:9092 --topic streams-input2 --property >> print.key=true --property key.separator=":" --from-beginning >> null:msg2 # key is null, value is what we produce above >> null:msg4 >> null:msg6 >> null:msg1 >> null:msg3 >> null:msg5 >> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >> --bootstrap-server localhost:9092 --topic streams-output2 --property >> print.key=true --property key.separator=":" --from-beginning >> msg2:1 >> msg1:1 >> msg1:1 >> msg3:1 >> msg2:1 >> msg4:1 >> msg1:1 >> msg3:1 >> msg5:1 >> msg2:1 >> msg4:1 >> msg6:1 # due to log compaction, same key will be overwrite. this is ok... >> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog >> --property print.key=true --property key.separator=":" --from-beginning >> *msg2:* >> *msg4:* >> *msg6:* >> *msg1:* >> *msg3:* >> *msg5:* >> Everything is ok, except changelog-topic trouble me. Why it's value is >> empty? >> >> I have dig into source code, and summary the workflow : >> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, >> no matter Memory or RocksDB, they both put into local storage, then append >> msg to StoreChangeLogger >> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange >> by passing ValueGetter, this getter will get value from local storage when >> logChange() operation happen >> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to >> changelog topic, here is streams-wordcount-Counts-changelog >> 4. StoreChangeLogger use dirty and remove Set to swap between logChange() >> and add(). >> >> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic >> V v = getter.get(k); // value getter will get value from local storage >> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), >> keySerializer, valueSerializer); >> } >> >> >> The Only reason I can figure out why value is empty is when invoke >> KeyValueStore's delete method. But Actualy the application did't do it >> >> Anyone help me, Tks . >> >> >
Re: Cluster in weird state: no leaders no ISR for all topics, but it works!
Hi, I have followed the instructions you detail and I could create topics, which were getting a leader and were properly replicated. I think the problem I experienced was due to some old temporary communication problems between Kafka and Zookeeper. But that's only a guess. Thanks a lot Mohammed for your time. Alberto del Barrio. On 5 June 2017 at 10:38, Mohammed Manna wrote: > Hi, > > I setup a fresh cluster (3-brokers, 3-keepers) and created a topic > according to your settings - obviously the log directories are kept > separeate e.g. (var/lib/zookeeper2 and var/lib/zookeeper3) and not to > mention, the myid files for every zookeeper to identify themselves in the > ensemble. Cannot see any issues. > > Do you mind trying it out with a fresh quorum information - please do the > following: > > 1) Delete the zookeeper logs from the log and dataLogDir locations > 2) in the zookeeper.properties for Kafka ensure that the zookeeper config > specifies different log directories for every zookeeper. > 3) Restart the cluster in the following pattern: > a) Start your zookeepers and allow ~5s waiting time between each, > b) Start your kafka brokers and allow ~5s waiting time between each > other. > > 4) Create a topic using console utility and do a --describe on the topic - > > > let us know. > > KR, > > > > On 2 June 2017 at 13:55, Del Barrio, Alberto < > alberto.delbar...@360dialog.com> wrote: > > > So, I fixed the problem doing a rolling restart, and after some checks > > seems there was no data loss. > > > > On 1 June 2017 at 17:57, Del Barrio, Alberto < > > alberto.delbar...@360dialog.com> wrote: > > > > > I might give it a try tomorrow. The reason for having so large init and > > > sync limit times is because in the past our ZK cluster was storing > large > > > amount of data, and lower values were not enough for the server syncs > > when > > > restarting zk processes. > > > > > > On 1 June 2017 at 17:52, Mohammed Manna wrote: > > > > > >> Cool - I will try and take a look into this - Meanwhile, do you mind > > >> awfuly > > >> to change the following and see if things improve? > > >> > > >> tickTime = 1000 > > >> initLimit=3 > > >> syncLimit=5 > > >> > > >> On 1 June 2017 at 16:49, Del Barrio, Alberto < > > >> alberto.delbar...@360dialog.com> wrote: > > >> > > >> > Here are the configs you were asking for: > > >> > > > >> > Zookeeper: > > >> > tickTime=1000 > > >> > initLimit=2000 > > >> > syncLimit=1000 > > >> > dataDir=/var/lib/zookeeper > > >> > clientPort=2181 > > >> > server.3=10.0.0.3:2888:3888 > > >> > server.2=10.0.0.2:2888:3888 > > >> > server.1=10.0.0.1:2888:3888 > > >> > > > >> > > > >> > Kafka broker (for one of them): > > >> > broker.id=10 > > >> > listeners=PLAINTEXT://10.0.0.4:9092 > > >> > num.network.threads=3 > > >> > num.io.threads=8 > > >> > socket.send.buffer.bytes=102400 > > >> > socket.receive.buffer.bytes=102400 > > >> > socket.request.max.bytes=104857600 > > >> > log.dirs=/var/lib/kafka > > >> > num.partitions=2 > > >> > num.recovery.threads.per.data.dir=1 > > >> > zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181/kafka > > >> > zookeeper.connection.timeout.ms=6000 > > >> > > > >> > In general they're pretty much the default ones. > > >> > I can see in Zookeeper the kafka brokers connected to it and > > exchanging > > >> > data... > > >> > > > >> > Thanks for your help and time. > > >> > > > >> > On 1 June 2017 at 17:32, Mohammed Manna wrote: > > >> > > > >> > > Could you please share your broker/zookeeper/topic configs ? > > >> > > > > >> > > On 1 June 2017 at 16:18, Del Barrio, Alberto < > > >> > > alberto.delbar...@360dialog.com> wrote: > > >> > > > > >> > > > I tried creating the topic and results are very similar to the > > >> current > > >> > > > situation: there are not ISR and no leader for any of the > > >> partitions, > > >> > but > > >> > > > now kafka-topics shows *Leader: none* when for all the other > > >> topics, it > > >> > > > shows *Leader: -1* > > >> > > > > > >> > > > > > >> > > > On 1 June 2017 at 17:05, Mohammed Manna > > wrote: > > >> > > > > > >> > > > > I had a similar situation, but only 1 of my ZKs was > struggling - > > >> but > > >> > > > since > > >> > > > > the ISR synching time is configurable I was confident to > bounce > > 1 > > >> ZK > > >> > > at a > > >> > > > > time and it worked out. > > >> > > > > does it happen even when you create a new topic with a > > >> > > > > replication:partition ration of 1? > > >> > > > > > > >> > > > > i meant, 3 replicas, 3 partitions :) > > >> > > > > > > >> > > > > On 1 June 2017 at 15:58, Del Barrio, Alberto < > > >> > > > > alberto.delbar...@360dialog.com> wrote: > > >> > > > > > > >> > > > > > Hi Mohammed, > > >> > > > > > > > >> > > > > > thanks for your answer. > > >> > > > > > The ZK cluster is not located in the servers where Kafka > runs > > >> but > > >> > in > > >> > > > > other > > >> > > > > > 3 different machines. This ZK cluster is used by several > other > > >> > > services >
Re: Debugging Kafka Streams Windowing
Mahendra, Did increasing those two properties do the trick? I am running into this exact issue testing streams out on a single Kafka instance. Yet I can manually start a consumer and read the topics fine while its busy doing this dead stuffs. On Tue, May 23, 2017 at 12:30 AM, Mahendra Kariya < mahendra.kar...@go-jek.com> wrote: > On 22 May 2017 at 16:09, Guozhang Wang wrote: > > > For > > that issue I'd suspect that there is a network issue, or maybe the > network > > is just saturated already and the heartbeat request / response were not > > exchanged in time between the consumer and the broker, or the sockets > being > > dropped because of socket limit. Under this cases not all consumers may > be > > affected, but since the associated issue is from "AbstractCoordinator" > > class which is part of the consumer client, I'd still be surprised if it > is > > actually due to Streams itself with the same consumer config settings, > but > > not to consumers. > > > > Yes. This is the conclusion that even we are coming to after further > investigation. But didn't want to post it here until we were sure. > > We are experimenting with increasing the default timeouts, particularly > hearbeat.interval.ms and session.timeout.ms. So far, the things have been > running fine. But we will let it run for a few more days before closing > this issue. >
Re: kafka streams changelog topic value
I add some log on StoreChangeLog for (K k : this.dirty) { V v = getter.get(k); log.info("logChange key:{},value:{}", k, v); collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); } and found the print result is normal, just some byte: [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] (org.apache.kafka.streams.state.internals.StoreChangeLogger) [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] (org.apache.kafka.streams.state.internals.StoreChangeLogger) but console-consumer still output kafka: so this may be problem of console-consume. 2017-06-07 18:24 GMT+08:00 john cheng : > I'm running WordCountProcessorDemo with Processor API. and change > something below > 1. config 1 stream-thread and 1 replicas > 2. change inMemory() to persistent() > MyKakfa version is 0.10.0.0. After running streaming application, I check > msg output by console-consumer > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic streams-input2 > msg1 # by console-producer, we can only produce message's value. so > message produce to input topic will use roundrobbin partition > msg2 > msg3 > msg4 > msg5 > msg6 > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-input2 --property > print.key=true --property key.separator=":" --from-beginning > null:msg2 # key is null, value is what we produce above > null:msg4 > null:msg6 > null:msg1 > null:msg3 > null:msg5 > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-output2 --property > print.key=true --property key.separator=":" --from-beginning > msg2:1 > msg1:1 > msg1:1 > msg3:1 > msg2:1 > msg4:1 > msg1:1 > msg3:1 > msg5:1 > msg2:1 > msg4:1 > msg6:1 # due to log compaction, same key will be overwrite. this is ok... > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog > --property print.key=true --property key.separator=":" --from-beginning > *msg2:* > *msg4:* > *msg6:* > *msg1:* > *msg3:* > *msg5:* > Everything is ok, except changelog-topic trouble me. Why it's value is > empty? > > I have dig into source code, and summary the workflow : > 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no > matter Memory or RocksDB, they both put into local storage, then append msg > to StoreChangeLogger > 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by > passing ValueGetter, this getter will get value from local storage when > logChange() operation happen > 3. when logChange() on StoreChangeLogger happen, send KeyValue message to > changelog topic, here is streams-wordcount-Counts-changelog > 4. StoreChangeLogger use dirty and remove Set to swap between logChange() > and add(). > > for (K k : this.dirty) { // logChange() method flush dirty to changelog topic > V v = getter.get(k); // value getter will get value from local storage > collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), > keySerializer, valueSerializer); > } > > > The Only reason I can figure out why value is empty is when invoke > KeyValueStore's delete method. But Actualy the application did't do it > > Anyone help me, Tks . > >
kafka streams changelog topic value
I'm running WordCountProcessorDemo with Processor API. and change something below 1. config 1 stream-thread and 1 replicas 2. change inMemory() to persistent() MyKakfa version is 0.10.0.0. After running streaming application, I check msg output by console-consumer ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-input2 msg1 # by console-producer, we can only produce message's value. so message produce to input topic will use roundrobbin partition msg2 msg3 msg4 msg5 msg6 ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-input2 --property print.key=true --property key.separator=":" --from-beginning null:msg2 # key is null, value is what we produce above null:msg4 null:msg6 null:msg1 null:msg3 null:msg5 ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-output2 --property print.key=true --property key.separator=":" --from-beginning msg2:1 msg1:1 msg1:1 msg3:1 msg2:1 msg4:1 msg1:1 msg3:1 msg5:1 msg2:1 msg4:1 msg6:1 # due to log compaction, same key will be overwrite. this is ok... ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog --property print.key=true --property key.separator=":" --from-beginning *msg2:* *msg4:* *msg6:* *msg1:* *msg3:* *msg5:* Everything is ok, except changelog-topic trouble me. Why it's value is empty? I have dig into source code, and summary the workflow : 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no matter Memory or RocksDB, they both put into local storage, then append msg to StoreChangeLogger 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by passing ValueGetter, this getter will get value from local storage when logChange() operation happen 3. when logChange() on StoreChangeLogger happen, send KeyValue message to changelog topic, here is streams-wordcount-Counts-changelog 4. StoreChangeLogger use dirty and remove Set to swap between logChange() and add(). for (K k : this.dirty) { // logChange() method flush dirty to changelog topic V v = getter.get(k); // value getter will get value from local storage collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); } The Only reason I can figure out why value is empty is when invoke KeyValueStore's delete method. But Actualy the application did't do it Anyone help me, Tks .
Re: Losing messages in Kafka Streams after upgrading
I tried to use a TimestampExtractor that uses our timestamps from the messages, and use a 'map' operation on the KTable to set it to current, to have a precise point where I discard our original timestamps. That does not work, (I verified by writing a separate java Kafka Consumer and spit out the timestamps) as the TimestampExtractor only gets called once, and it will stick with that time. I did not really have a good reason not to simply use the WallclockTimeExtractor, and that one seems to do exactly what I wanted. So, I'm good! I am interested in the community discussion Guozhang mentions. Is there a KIP for that? regards, Frank On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax wrote: > Frank, > > If you use "now", I assume you are calling System.currentTimeMillis(). > If yes, you can also use predefined WallclockTimestampExtractor that > ships with Streams (no need to write your own one). > > > I thought that the Timestamp extractor would then also use > >> that updated timestamp as 'stream time', but I don't really see that > >> happening, so that assumption was wrong. > > Yes, this should happen. Not sure why you don't observe this. And thus, > the producer should use this timestamp to write the records. > > How did you verify the timestamps that are set for your output records? > > > -Matthias > > > On 6/5/17 6:15 AM, Frank Lyaruu wrote: > > Thanks Guozhang, > > > > I figured I could use a custom timestamp extractor, and set that > timestamp > > to 'now' when reading a source topic, as the original timestamp is pretty > > much irrelevant. I thought that the Timestamp extractor would then also > use > > that updated timestamp as 'stream time', but I don't really see that > > happening, so that assumption was wrong. > > > > If I could configure a timestamp extractor that would also be used by the > > producer I think I'd be in business, but right now I don't see an elegant > > way forward, so any ideas for work arounds are welcome. > > > > regards, Frank > > > > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang > wrote: > > > >> Frank, thanks for sharing with your findings. > >> > >> I think this is a general issue to consider in Streams, and the > community > >> has been thinking about it: we write intermediate topics with the stream > >> time that is inherited from the source topic's timestamps, however that > >> timestamp is used for log rolling / retention etc as well, and these two > >> purposes (use timestamps in processing for out-of-ordering and late > >> arrivals, and operations on the Kafka topics) could rely on different > >> timestamp semantics. We need to revisit on timestamps can be maintained > >> across the topology in Streams. > >> > >> Guozhang > >> > >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu > wrote: > >> > >>> Hi Matthias, > >>> > >>> Ok, that clarifies quite a bit. I never really went into the timestamp > >>> aspects, as time does not really play a role in my application (aside > >> from > >>> the repartition topics, I have no KStreams or Windowed operation, just > >>> different kind of KTable join). > >>> > >>> I do think that the fail case I see (With this version joining two > 'old' > >>> KTables causes a small percentage of records to vanish) is far from > >>> intuitive, and it somehow worked fine until a few weeks ago. > >>> > >>> I think your option 3 should work. I'll make a custom timestamp > extractor > >>> (I actually do have a timestamp in my messages), and I'll set it to the > >>> current time as they enter the streams application. > >>> > >>> Thanks, that helped, regards, Frank > >>> > >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax > > >>> wrote: > >>> > Hi Frank, > > yes, retention policy is based on the embedded record timestamps and > >> not > on system time. Thus, if you send messages with an old timestamp, they > can trigger log/segment rolling. > > >> I see that the repartition topics have timestamp.type = CreateTime, > >>> does > >> that mean it uses the timestamp of the > >> original message? > > Yes. That's the default setting on the broker side. For Streams, we > maintain a so-called "stream time" that is computed based on the input > record timestamps. This "stream time" is used to set the timestamp for > records that are written by Stream. (so it's more or less the > timestamp > of the input records). > > >> Shouldn't that be LogAppendTime for repartition topics? > > No. Streams needs to preserve the original timestamp to guaranteed > correct semantics for downstream window operations. Thus, it should be > CreateTime -- if you switch to LogAppendTime, you might break your > application logic and get wrong results. > > >> Or is there a way to configure that? > > You can configure this on a per topic basis on the brokers. > > >> If I hack into my Kafka streams code to force it to use > >> LogAppendTime > se
ACLs for regular expression
Hi, Is it possible to give ACLs for a regular expression for group names ? For example ..I want to give Read access for all group names with prefix DNS* Thanks