Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread João Peixoto
But my stream definition does not have a state store at all, Rocksdb or in
memory... That's the most concerning part...
On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal  wrote:

> One instance with 10 threads may cause rocksdb issues.
> What is the RAM you have?
>
> Also check CPU wait time. Many rocks db instances on one machine (depends
> upon number of partitions) may cause lot of disk i/o causing wait times to
> increase and hence slowing down the message processing causing frequent
> rebalance's.
>
> Also what is your topic partitions. My experience is having one thread per
> partition is ideal.
>
> Thanks
> Sachin
>
>
> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto 
> wrote:
>
> > There is one instance with 10 threads.
> >
> > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang  wrote:
> >
> > > João,
> > >
> > > Do you also have multiple running instances in parallel, and how many
> > > threads are your running within each instance?
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto 
> > > wrote:
> > >
> > > > Eno before I do so I just want to be sure this would not be a
> > duplicate.
> > > I
> > > > just found the following issues:
> > > >
> > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> > > fixed
> > > > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> > > progress
> > > >
> > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska 
> > > > wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > This might be a bug, would you mind opening a JIRA (copy-pasting
> > below
> > > is
> > > > > sufficient).
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > > > On 7 Jun 2017, at 21:38, João Peixoto 
> > > wrote:
> > > > > >
> > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > > > >
> > > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > > > o.a.k.s.p.internals.StreamThread : Could not create task
> > > 0_31.
> > > > > Will
> > > > > > retry.
> > > > > >
> > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed
> > to
> > > > lock
> > > > > > the state directory for task 0_31
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.(ProcessorStateManager.java:100)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.(
> > > > AbstractTask.java:73)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamTask.(StreamTask.java:108)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.createStreamTask(StreamThread.java:864)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > StreamThread$TaskCreator.
> > > > createTask(StreamThread.java:1237)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$
> > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > StreamThread.access$600(
> > > > StreamThread.java:69)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > > onPartitionsAssigned(StreamThread.java:234)
> > > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > > onJoinComplete(ConsumerCoordinator.java:259)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > > ConsumerCoordinator.java:290)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1029)
> > > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > > at
> > > > > >
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaCons

Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread Sachin Mittal
One instance with 10 threads may cause rocksdb issues.
What is the RAM you have?

Also check CPU wait time. Many rocks db instances on one machine (depends
upon number of partitions) may cause lot of disk i/o causing wait times to
increase and hence slowing down the message processing causing frequent
rebalance's.

Also what is your topic partitions. My experience is having one thread per
partition is ideal.

Thanks
Sachin


On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto 
wrote:

> There is one instance with 10 threads.
>
> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang  wrote:
>
> > João,
> >
> > Do you also have multiple running instances in parallel, and how many
> > threads are your running within each instance?
> >
> > Guozhang
> >
> >
> > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto 
> > wrote:
> >
> > > Eno before I do so I just want to be sure this would not be a
> duplicate.
> > I
> > > just found the following issues:
> > >
> > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> > fixed
> > > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> > progress
> > >
> > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska 
> > > wrote:
> > >
> > > > Hi there,
> > > >
> > > > This might be a bug, would you mind opening a JIRA (copy-pasting
> below
> > is
> > > > sufficient).
> > > >
> > > > Thanks
> > > > Eno
> > > > > On 7 Jun 2017, at 21:38, João Peixoto 
> > wrote:
> > > > >
> > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > > >
> > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > > o.a.k.s.p.internals.StreamThread : Could not create task
> > 0_31.
> > > > Will
> > > > > retry.
> > > > >
> > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed
> to
> > > lock
> > > > > the state directory for task 0_31
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.(ProcessorStateManager.java:100)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.(
> > > AbstractTask.java:73)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.(StreamTask.java:108)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:864)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.
> > > createTask(StreamThread.java:1237)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$
> > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$600(
> > > StreamThread.java:69)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:234)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:259)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:290)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1029)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:995)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:592)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:361)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > >
> > > > >
> > > > > It has been printing it for hours now

Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread João Peixoto
There is one instance with 10 threads.

On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang  wrote:

> João,
>
> Do you also have multiple running instances in parallel, and how many
> threads are your running within each instance?
>
> Guozhang
>
>
> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto 
> wrote:
>
> > Eno before I do so I just want to be sure this would not be a duplicate.
> I
> > just found the following issues:
> >
> > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> fixed
> > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> progress
> >
> > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska 
> > wrote:
> >
> > > Hi there,
> > >
> > > This might be a bug, would you mind opening a JIRA (copy-pasting below
> is
> > > sufficient).
> > >
> > > Thanks
> > > Eno
> > > > On 7 Jun 2017, at 21:38, João Peixoto 
> wrote:
> > > >
> > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > >
> > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > o.a.k.s.p.internals.StreamThread : Could not create task
> 0_31.
> > > Will
> > > > retry.
> > > >
> > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to
> > lock
> > > > the state directory for task 0_31
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.(ProcessorStateManager.java:100)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.(
> > AbstractTask.java:73)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.(StreamTask.java:108)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:864)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1237)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> > StreamThread.java:69)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:234)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:259)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:290)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1029)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > >
> > > >
> > > > It has been printing it for hours now, so it does not recover at all.
> > > > The most worrying thing is that this stream definition does not even
> > use
> > > > state stores, it literally looks like this:
> > > >
> > > > KStreamBuilder builder = new KStreamBuilder();
> > > >KStream kStream =
> > > > builder.stream(appOptions.getInput().getTopic());
> > > >kStream.process(() -> processor);
> > > >new KafkaStreams(builder, streamsConfiguration);
> > > >
> > > > The "processor" does its thing and calls "context().commit()" when
> > done.
> > > > That's it. Looking at the actual machine running the instance, the
> > > folders
> > > > under /tmp/kafka-streams// only have a .lock file.
> > > >
> > > > This seems to have been bootstrapped by the exception:
> > > >
> > > > org.apa

Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread Guozhang Wang
João,

Do you also have multiple running instances in parallel, and how many
threads are your running within each instance?

Guozhang


On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto 
wrote:

> Eno before I do so I just want to be sure this would not be a duplicate. I
> just found the following issues:
>
> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed
> on 0.11.0.0/0.10.2.2 (both not released afaik)
> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress
>
> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska 
> wrote:
>
> > Hi there,
> >
> > This might be a bug, would you mind opening a JIRA (copy-pasting below is
> > sufficient).
> >
> > Thanks
> > Eno
> > > On 7 Jun 2017, at 21:38, João Peixoto  wrote:
> > >
> > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > >
> > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread : Could not create task 0_31.
> > Will
> > > retry.
> > >
> > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to
> lock
> > > the state directory for task 0_31
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.(ProcessorStateManager.java:100)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:73)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:108)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:864)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1237)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:967)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> StreamThread.java:69)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:234)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:259)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > > [kafka-clients-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > > [kafka-streams-0.10.2.1.jar!/:na]
> > >
> > >
> > > It has been printing it for hours now, so it does not recover at all.
> > > The most worrying thing is that this stream definition does not even
> use
> > > state stores, it literally looks like this:
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > >KStream kStream =
> > > builder.stream(appOptions.getInput().getTopic());
> > >kStream.process(() -> processor);
> > >new KafkaStreams(builder, streamsConfiguration);
> > >
> > > The "processor" does its thing and calls "context().commit()" when
> done.
> > > That's it. Looking at the actual machine running the instance, the
> > folders
> > > under /tmp/kafka-streams// only have a .lock file.
> > >
> > > This seems to have been bootstrapped by the exception:
> > >
> > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be
> > > completed since the group has already rebalanced and assigned the
> > > partitions to another member. This means that the time between
> subsequent
> > > calls to poll() was longer than the configured max.poll.interval.ms,
> > which
> > > typically implies that the poll loop is spending too much time message
> > > proc

kafka streams checkpoint and restore understanding

2017-06-07 Thread john cheng
I have two app instance, input topic has 2 partitions, each instance config
one thread and one replicas.
also, instance1's state-store is /tmp/kafka-streams, instance2's
state-store is /tmp/kafka-streams2.
now I do this experiment to study checkpointin kafka streams (0.10.0.0).

1. start instance1, send two msg(msg1 and msg2) to input topic, no
checkpoint file
2. stop instance1, checkpoint file in /tmp/kafka-streams, both partition's
offset equals 1
3. restart instance1(althrough new UUID mean new instance, but here
consider the same instance as before), no checkpoint file again
4. start instance2, send two msg(msg3 and msg4) to input topic, also no
checkpoint file
5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's
offset equals 2
6. send two msg(msg5 and msg6) to input topic, now this two msg all go to
instance2
7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's
offset equals 3

after two instance stopped, below is the checkpoint file content

$ strings kafka-streams/*/*/.checkpoint   --> instance1
streams-wc-Counts-changelog 0 2
streams-wc-Counts-changelog 1 2
$ strings kafka-streams2/*/*/.checkpoint  --> instance2
streams-wc-Counts-changelog 0 3
streams-wc-Counts-changelog 1 3

I draw a simple table about the partition and offset of each msg, also the
event happend.

  Partition,Offset  | Partition,Offset| What Happened
msg1  P0,0
msg2| P1,0| restart instance1
msg3  P0,1| after start instance2
msg4| P1,1
msg5  P0,2| after stop instance1
msg6| P1,2

Next, use kafka-consumer-offset-checker.sh to check input-topic, and all
six msg(each partition has three msg) were consumed

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic
streams-wc-input1 --group streams-wc
Group   Topic  Pid Offset  logSize
Lag Owner
streams-wc  streams-wc-input1  0   3   3
0   none
streams-wc  streams-wc-input1  1   3   3
0   none

Now If we restart instance1 again, As only one instance exist, so standby
task will not take effect.
That means, instance1 will create two active StreamTask, and both task will
restoreActiveState from changelog topic.

when restore active task, we have to seek to some position, here as
instance1's state store: /tmp/kafka-streams checkpoint file
has both changelog partition, so restoreConsumer will seek to position 2.

And change to another situation, What about restart instance2 not
instance1?
the restoreConsumer will seek to position 3, because instance2's active
task read checkpoint in /tmp/kafka-streams2.

The different between this two situation is which position beginning to
restore StateStore in StreamTask.

Situation One, only restart instance1, beginning position 2 means, P0 seek
after msg3, P1 seek after msg4
Situation Two, only restart instance12 beginning position 3 means, P0 seek
after msg5, P1 seek after msg6

from Partition view, msg1,msg3,msg5 go to partition 0.

msg P0's offset  | Instance1 restart  | Instance2 restart
msg1,1  0
msg3,1  1
msg5,1  2  | <- seek at pos 2   |
   | seek at pos 3

msg P1's offset  | Instance1 restart  | Instance2 restart
msg2,1  0
msg4,1  1
msg6,1  2  | <- seek at pos 2   |
   | <-seek at pos
3

The restore process is poll records from changelog-topic at specific
position.
in situation one, restore msg5 and msg6 to StreamTask's state store. msg5
to task1, msg6 to task0
in situation two, restore nothing to StreamTask's state store???

I have some question about the restore process and checkpoint below:
1. Should we seek to beginning, because restore state store must be
complete view of previous?
2. The two situation described above, What will happen?

Hope someone expain to me, or collect me if I'm understand wrong. Tks
before.


Re: Async Non Blocking Kafka Producer

2017-06-07 Thread Hans Jespersen
If you are setting acks=0 then you don't care about losing data even when the 
cluster is up. The only way to get at-least-once is acks=all.

> On Jun 7, 2017, at 1:12 PM, Ankit Jain  wrote:
> 
> Thanks hans.
> 
> It would work but producer will start loosing the data even the Cluster is
> available.
> 
> Thanks
> Ankit Jain
> 
>> On Wed, Jun 7, 2017 at 12:56 PM, Hans Jespersen  wrote:
>> 
>> Try adding props.put("max.block.ms", "0");
>> 
>> -hans
>> 
>> 
>> 
>>> On Jun 7, 2017, at 12:24 PM, Ankit Jain  wrote:
>>> 
>>> Hi,
>>> 
>>> We want to use the non blocking Kafka producer. The producer thread
>> should
>>> not block if the Kafka is cluster is down or not reachable.
>>> 
>>> Currently, we are setting following properties but the Producer thread is
>>> still blocking if the Kafka cluster goes gown or unreachable.
>>> 
>>> * props.put("block.on.buffer.full", "false");*
>>> * props.put("acks", "0");*
>>> 
>>> --
>>> Thanks
>> 
>> 
> 
> 
> -- 
> Thanks,
> Ankit Jain


Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread João Peixoto
Eno before I do so I just want to be sure this would not be a duplicate. I
just found the following issues:

* https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed
on 0.11.0.0/0.10.2.2 (both not released afaik)
* https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress

On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska  wrote:

> Hi there,
>
> This might be a bug, would you mind opening a JIRA (copy-pasting below is
> sufficient).
>
> Thanks
> Eno
> > On 7 Jun 2017, at 21:38, João Peixoto  wrote:
> >
> > I'm using Kafka Streams 0.10.2.1 and I still see this error
> >
> > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > o.a.k.s.p.internals.StreamThread : Could not create task 0_31.
> Will
> > retry.
> >
> > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
> > the state directory for task 0_31
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > ~[kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > [kafka-clients-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> > [kafka-streams-0.10.2.1.jar!/:na]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> > [kafka-streams-0.10.2.1.jar!/:na]
> >
> >
> > It has been printing it for hours now, so it does not recover at all.
> > The most worrying thing is that this stream definition does not even use
> > state stores, it literally looks like this:
> >
> > KStreamBuilder builder = new KStreamBuilder();
> >KStream kStream =
> > builder.stream(appOptions.getInput().getTopic());
> >kStream.process(() -> processor);
> >new KafkaStreams(builder, streamsConfiguration);
> >
> > The "processor" does its thing and calls "context().commit()" when done.
> > That's it. Looking at the actual machine running the instance, the
> folders
> > under /tmp/kafka-streams// only have a .lock file.
> >
> > This seems to have been bootstrapped by the exception:
> >
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> > completed since the group has already rebalanced and assigned the
> > partitions to another member. This means that the time between subsequent
> > calls to poll() was longer than the configured max.poll.interval.ms,
> which
> > typically implies that the poll loop is spending too much time message
> > processing. You can address this either by increasing the session timeout
> > or by reducing the maximum size of batches returned in poll() with
> > max.poll.records.
> >
> > We are addressing the latter by reducing "max.poll.records" and
> increasing "
> > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry
> about
> > state dirs if there are no state stores? Since it doesn't seem to do so
> > automatically, can I configured it somehow to achieve this end?
> >
> > Additional

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Steven Schlansker
Indeed, all good points.  Thanks all for the continuing valuable feedback!

> On Jun 7, 2017, at 3:07 PM, Matthias J. Sax  wrote:
> 
> If you write to remote DB, keep in mind that this will impact you
> Streams app, as you loose data locality.
> 
> Thus, populating a DB from the changelog might be better. It also
> decouples both systems what give you the advantage that your Streams app
> can still run if DB has an issues. If you write directly into DB and DB
> is not available Streams App is doomed to fail too.
> 
> 
> -Matthias
> 
> 
> On 6/7/17 2:54 PM, Jan Filipiak wrote:
>> Depends, embedded postgress puts you into the same spot.
>> 
>> But if you use your state store change log to materialize into a
>> postgress; that might work out decently.
>> Current JDBC doesn't support delete which is an issue but writing a
>> custom sink is not to hard.
>> 
>> Best Jan
>> 
>> 
>> On 07.06.2017 23:47, Steven Schlansker wrote:
>>> I was actually considering writing my own KeyValueStore backed
>>> by e.g. a Postgres or the like.
>>> 
>>> Is there some feature Connect gains me that would make it better
>>> than such an approach?
>>> 
>>> thanks
>>> 
 On Jun 7, 2017, at 2:20 PM, Jan Filipiak 
 wrote:
 
 Hi,
 
 have you thought about using connect to put data into a store that is
 more reasonable for your kind of query requirements?
 
 Best Jan
 
 On 07.06.2017 00:29, Steven Schlansker wrote:
>> On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:
>> 
>> Steven,
>> 
>> In practice, data shouldn't be migrating that often. If it is then you
>> probably have bigger problems.
> Understood and agreed, but when designing distributed systems, it
> usually
> helps to model for the worst case rather than the "well that should
> never
> happen" case, lest you find yourself fixing those bugs at 3am
> instead :)
> 
> I'd like to be able to induce extreme pain at the Kafka layer
> (change leader
> every 3 seconds and migrate all partitions around randomly) and
> still have
> my app behave correctly.
> 
>> You should be able to use the metadata api
>> to find the instance the key should be on and then when you check
>> that node
>> you can also check with the metadata api that the key should still
>> be on
>> this host. If streams is rebalancing while you query an exception
>> will be
>> raised and you'll need to retry the request once the rebalance has
>> completed.
> Agreed here as well.  But let's assume I have a very fast replication
> setup (assume it takes zero time, for the sake of argument) -- I'm
> fairly
> sure there's still a race here as this exception only fires *during
> a migration*
> not *after a migration that may have invalidated your metadata
> lookup completes*
> 
>> HTH,
>> Damian
>> 
>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker
>> 
>> wrote:
>> 
 On Jun 6, 2017, at 6:16 AM, Eno Thereska 
 wrote:
 
 Hi Steven,
 
 Do you know beforehand if a key exists? If you know that and are
 getting
>>> null() the code will have to retry by refreshing the metadata and
>>> going to
>>> the new instance. If you don’t know beforehand if a key exists or
>>> not you
>>> might have to check all instances of a store to make sure.
>>> No, I am not presupposing that the key can exist -- this is a user
>>> visible
>>> API and will
>>> be prone to "accidents" :)
>>> 
>>> Thanks for the insight.  I worry that even checking all stores is not
>>> truly sufficient,
>>> as querying different all workers at different times in the
>>> presence of
>>> migrating data
>>> can still in theory miss it given pessimal execution.
>>> 
>>> I'm sure I've long wandered off into the hypothetical, but I dream
>>> of some
>>> day being
>>> cool like Jepsen :)
>>> 
 Eno
 
 
> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>> sschlans...@opentable.com> wrote:
> Hi everyone, me again :)
> 
> I'm still trying to implement my "remoting" layer that allows
> my clients to see the partitioned Kafka Streams state
> regardless of which instance they hit.  Roughly, my lookup is:
> 
> Message get(Key key) {
>  RemoteInstance instance = selectPartition(key);
>  return instance.get(key); // http remoting
> }
> 
> RemoteInstance.get(Key key) { // http endpoint
>  return readOnlyKeyValueStore.get(key);
> }
> 
> However, the mapping of partitions to instances may change.
> If you call KeyValueStore.get(K) where K is on a partition you
> don't own, it returns null.  This is indistinguishable from a
> successful get on a key that doesn

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Matthias J. Sax
If you write to remote DB, keep in mind that this will impact you
Streams app, as you loose data locality.

Thus, populating a DB from the changelog might be better. It also
decouples both systems what give you the advantage that your Streams app
can still run if DB has an issues. If you write directly into DB and DB
is not available Streams App is doomed to fail too.


-Matthias


On 6/7/17 2:54 PM, Jan Filipiak wrote:
> Depends, embedded postgress puts you into the same spot.
> 
> But if you use your state store change log to materialize into a
> postgress; that might work out decently.
> Current JDBC doesn't support delete which is an issue but writing a
> custom sink is not to hard.
> 
> Best Jan
> 
> 
> On 07.06.2017 23:47, Steven Schlansker wrote:
>> I was actually considering writing my own KeyValueStore backed
>> by e.g. a Postgres or the like.
>>
>> Is there some feature Connect gains me that would make it better
>> than such an approach?
>>
>> thanks
>>
>>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak 
>>> wrote:
>>>
>>> Hi,
>>>
>>> have you thought about using connect to put data into a store that is
>>> more reasonable for your kind of query requirements?
>>>
>>> Best Jan
>>>
>>> On 07.06.2017 00:29, Steven Schlansker wrote:
> On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:
>
> Steven,
>
> In practice, data shouldn't be migrating that often. If it is then you
> probably have bigger problems.
 Understood and agreed, but when designing distributed systems, it
 usually
 helps to model for the worst case rather than the "well that should
 never
 happen" case, lest you find yourself fixing those bugs at 3am
 instead :)

 I'd like to be able to induce extreme pain at the Kafka layer
 (change leader
 every 3 seconds and migrate all partitions around randomly) and
 still have
 my app behave correctly.

> You should be able to use the metadata api
> to find the instance the key should be on and then when you check
> that node
> you can also check with the metadata api that the key should still
> be on
> this host. If streams is rebalancing while you query an exception
> will be
> raised and you'll need to retry the request once the rebalance has
> completed.
 Agreed here as well.  But let's assume I have a very fast replication
 setup (assume it takes zero time, for the sake of argument) -- I'm
 fairly
 sure there's still a race here as this exception only fires *during
 a migration*
 not *after a migration that may have invalidated your metadata
 lookup completes*

> HTH,
> Damian
>
> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker
> 
> wrote:
>
>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska 
>>> wrote:
>>>
>>> Hi Steven,
>>>
>>> Do you know beforehand if a key exists? If you know that and are
>>> getting
>> null() the code will have to retry by refreshing the metadata and
>> going to
>> the new instance. If you don’t know beforehand if a key exists or
>> not you
>> might have to check all instances of a store to make sure.
>> No, I am not presupposing that the key can exist -- this is a user
>> visible
>> API and will
>> be prone to "accidents" :)
>>
>> Thanks for the insight.  I worry that even checking all stores is not
>> truly sufficient,
>> as querying different all workers at different times in the
>> presence of
>> migrating data
>> can still in theory miss it given pessimal execution.
>>
>> I'm sure I've long wandered off into the hypothetical, but I dream
>> of some
>> day being
>> cool like Jepsen :)
>>
>>> Eno
>>>
>>>
 On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>> sschlans...@opentable.com> wrote:
 Hi everyone, me again :)

 I'm still trying to implement my "remoting" layer that allows
 my clients to see the partitioned Kafka Streams state
 regardless of which instance they hit.  Roughly, my lookup is:

 Message get(Key key) {
   RemoteInstance instance = selectPartition(key);
   return instance.get(key); // http remoting
 }

 RemoteInstance.get(Key key) { // http endpoint
   return readOnlyKeyValueStore.get(key);
 }

 However, the mapping of partitions to instances may change.
 If you call KeyValueStore.get(K) where K is on a partition you
 don't own, it returns null.  This is indistinguishable from a
 successful get on a key that doesn't exist.

 If one instance selects a sibling instance right as the
 partition is
>> failing
 off of that instance, it may get routed there and by the time it
 gets
 the request no longer "owns" the partition -- returns a false
 'null'.

>

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Depends, embedded postgress puts you into the same spot.

But if you use your state store change log to materialize into a 
postgress; that might work out decently.
Current JDBC doesn't support delete which is an issue but writing a 
custom sink is not to hard.


Best Jan


On 07.06.2017 23:47, Steven Schlansker wrote:

I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks


On Jun 7, 2017, at 2:20 PM, Jan Filipiak  wrote:

Hi,

have you thought about using connect to put data into a store that is more 
reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven





Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Steven Schlansker
I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks

> On Jun 7, 2017, at 2:20 PM, Jan Filipiak  wrote:
> 
> Hi,
> 
> have you thought about using connect to put data into a store that is more 
> reasonable for your kind of query requirements?
> 
> Best Jan
> 
> On 07.06.2017 00:29, Steven Schlansker wrote:
>>> On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:
>>> 
>>> Steven,
>>> 
>>> In practice, data shouldn't be migrating that often. If it is then you
>>> probably have bigger problems.
>> Understood and agreed, but when designing distributed systems, it usually
>> helps to model for the worst case rather than the "well that should never
>> happen" case, lest you find yourself fixing those bugs at 3am instead :)
>> 
>> I'd like to be able to induce extreme pain at the Kafka layer (change leader
>> every 3 seconds and migrate all partitions around randomly) and still have
>> my app behave correctly.
>> 
>>> You should be able to use the metadata api
>>> to find the instance the key should be on and then when you check that node
>>> you can also check with the metadata api that the key should still be on
>>> this host. If streams is rebalancing while you query an exception will be
>>> raised and you'll need to retry the request once the rebalance has
>>> completed.
>> Agreed here as well.  But let's assume I have a very fast replication
>> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
>> sure there's still a race here as this exception only fires *during a 
>> migration*
>> not *after a migration that may have invalidated your metadata lookup 
>> completes*
>> 
>>> HTH,
>>> Damian
>>> 
>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
>>> wrote:
>>> 
> On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:
> 
> Hi Steven,
> 
> Do you know beforehand if a key exists? If you know that and are getting
 null() the code will have to retry by refreshing the metadata and going to
 the new instance. If you don’t know beforehand if a key exists or not you
 might have to check all instances of a store to make sure.
 No, I am not presupposing that the key can exist -- this is a user visible
 API and will
 be prone to "accidents" :)
 
 Thanks for the insight.  I worry that even checking all stores is not
 truly sufficient,
 as querying different all workers at different times in the presence of
 migrating data
 can still in theory miss it given pessimal execution.
 
 I'm sure I've long wandered off into the hypothetical, but I dream of some
 day being
 cool like Jepsen :)
 
> Eno
> 
> 
>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
 sschlans...@opentable.com> wrote:
>> Hi everyone, me again :)
>> 
>> I'm still trying to implement my "remoting" layer that allows
>> my clients to see the partitioned Kafka Streams state
>> regardless of which instance they hit.  Roughly, my lookup is:
>> 
>> Message get(Key key) {
>>  RemoteInstance instance = selectPartition(key);
>>  return instance.get(key); // http remoting
>> }
>> 
>> RemoteInstance.get(Key key) { // http endpoint
>>  return readOnlyKeyValueStore.get(key);
>> }
>> 
>> However, the mapping of partitions to instances may change.
>> If you call KeyValueStore.get(K) where K is on a partition you
>> don't own, it returns null.  This is indistinguishable from a
>> successful get on a key that doesn't exist.
>> 
>> If one instance selects a sibling instance right as the partition is
 failing
>> off of that instance, it may get routed there and by the time it gets
>> the request no longer "owns" the partition -- returns a false 'null'.
>> 
>> You can try re-checking after you get a null value, but that's
 susceptible
>> to the same race -- it's unlikely but possible that the data migrates
 *back*
>> before you do this re-check.
>> 
>> Is there any way to correctly implement this without races?  I'd imagine
>> you need a new primitive like KeyValueStore#get that atomically finds
>> the key or throws an exception if it is not in an owned partition
>> at the time of lookup so you know to recheck the partition and retry.
>> 
>> Thoughts?
>> 
>> Thanks again,
>> Steven
>> 
 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: kafka streams changelog topic value

2017-06-07 Thread Matthias J. Sax
ConsoleConsumer by default uses String deserializer, but value in the
changelog is of type long. For output topic, the type in converted from
long to string though -- thus you can read the output topic without
problems.

For reading the changelog topic, you need to specify option

 --property
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

(hope I got the option right :))


-Matthias


On 6/7/17 6:40 AM, john cheng wrote:
> the right way to see changelog persistent by rocksdb is use ByteDeser, and
> then decode hex to string
> 
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> 
> for(ConsumerRecord record: consumerRecords) {
> 
>   print bytesToHexString(record.value())
> 
> }
> 
> public static String bytesToHexString(byte[] src){
> StringBuilder stringBuilder = new StringBuilder("");
> if (src == null || src.length <= 0) {
> return null;
> }
> for (int i = 0; i < src.length; i++) {
> int v = src[i] & 0xFF;
> String hv = Integer.toHexString(v);
> if (hv.length() < 2) {
> stringBuilder.append(0);
> }
> stringBuilder.append(hv);
> }
> return stringBuilder.toString();
> }
> 
> output now collect
> 
> p:0,o:0,k:msg1,v:0001
> p:0,o:1,k:msg3,v:0001
> p:0,o:2,k:msg5,v:0001
> p:0,o:3,k:msg1,v:0002
> p:0,o:4,k:msg3,v:0002
> p:1,o:0,k:msg2,v:0001
> p:1,o:1,k:msg4,v:0001
> p:1,o:2,k:msg2,v:0002
> p:1,o:3,k:msg2,v:0003
> 
> 
> 2017-06-07 18:42 GMT+08:00 john cheng :
> 
>> I add some log on StoreChangeLog
>>
>> for (K k : this.dirty) {
>> V v = getter.get(k);
>> log.info("logChange key:{},value:{}", k, v);
>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
>> keySerializer, valueSerializer);
>> }
>>
>> and found the print result is normal, just some byte:
>>
>> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] 
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] 
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>>
>> but console-consumer still output kafka:
>>
>> so this may be problem of console-consume.
>>
>>
>> 2017-06-07 18:24 GMT+08:00 john cheng :
>>
>>> I'm running WordCountProcessorDemo with Processor API. and change
>>> something below
>>> 1. config 1 stream-thread and 1 replicas
>>> 2. change inMemory() to persistent()
>>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>>> msg output by console-consumer
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>>> localhost:9092 --topic streams-input2
>>> msg1  # by console-producer, we can only produce message's value. so
>>> message produce to input topic will use roundrobbin partition
>>> msg2
>>> msg3
>>> msg4
>>> msg5
>>> msg6
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> null:msg2  # key is null, value is what we produce above
>>> null:msg4
>>> null:msg6
>>> null:msg1
>>> null:msg3
>>> null:msg5
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> msg2:1
>>> msg1:1
>>> msg1:1
>>> msg3:1
>>> msg2:1
>>> msg4:1
>>> msg1:1
>>> msg3:1
>>> msg5:1
>>> msg2:1
>>> msg4:1
>>> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>>> --property print.key=true --property key.separator=":" --from-beginning
>>> *msg2:*
>>> *msg4:*
>>> *msg6:*
>>> *msg1:*
>>> *msg3:*
>>> *msg5:*
>>> Everything is ok, except changelog-topic trouble me. Why it's value is
>>> empty?
>>>
>>> I have dig into source code, and summary the workflow :
>>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>>> no matter Memory or RocksDB, they both put into local storage, then append
>>> msg to StoreChangeLogger
>>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>>> by passing ValueGetter, this getter will get value from local storage when
>>> logChange() operation happen
>>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>>> changelog topic, here is  streams-wordcount-Counts-changelog
>>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>>> and add().
>>>
>>> for (K k : this.dirty) { // logChange() method flush dirty to changelog 
>>> topic
>>> V v = getter.get(k);  // value getter will get value from local storage
>>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Hi,

have you thought about using connect to put data into a store that is 
more reasonable for your kind of query requirements?


Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven







Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread Eno Thereska
Hi there,

This might be a bug, would you mind opening a JIRA (copy-pasting below is 
sufficient).

Thanks
Eno
> On 7 Jun 2017, at 21:38, João Peixoto  wrote:
> 
> I'm using Kafka Streams 0.10.2.1 and I still see this error
> 
> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> o.a.k.s.p.internals.StreamThread : Could not create task 0_31. Will
> retry.
> 
> org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
> the state directory for task 0_31
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> [kafka-streams-0.10.2.1.jar!/:na]
> 
> 
> It has been printing it for hours now, so it does not recover at all.
> The most worrying thing is that this stream definition does not even use
> state stores, it literally looks like this:
> 
> KStreamBuilder builder = new KStreamBuilder();
>KStream kStream =
> builder.stream(appOptions.getInput().getTopic());
>kStream.process(() -> processor);
>new KafkaStreams(builder, streamsConfiguration);
> 
> The "processor" does its thing and calls "context().commit()" when done.
> That's it. Looking at the actual machine running the instance, the folders
> under /tmp/kafka-streams// only have a .lock file.
> 
> This seems to have been bootstrapped by the exception:
> 
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout
> or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> 
> We are addressing the latter by reducing "max.poll.records" and increasing "
> commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
> state dirs if there are no state stores? Since it doesn't seem to do so
> automatically, can I configured it somehow to achieve this end?
> 
> Additionally, what could lead to it not being able to recover?
> 
> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax 
> wrote:
> 
>> Great! :)
>> 
>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>> I see now that my Kafka cluster is very stable, and these errors dont
>> come
>>> now.
>>> 
>>> -Sameer.
>>> 
>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar 
>> wrote:
>>> 
 Yes, I have upgraded my cluster and client both to version 10.2.1 and
 currently monitoring the situation.
 Will report back in case I find any errors. Thanks for the help though.
 
 -Sameer.
 
 On Fri, 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-07 Thread Jan Filipiak

Hi Eno,

On 07.06.2017 22:49, Eno Thereska wrote:

Comments inline:


On 5 Jun 2017, at 18:19, Jan Filipiak  wrote:

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:

Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.

there is no reasonable way to "skip" a crc error. How can you know the length 
you read was anything reasonable? you might be completely lost inside your response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?
I don't think so. IMO you can not reasonably continue parsing when the 
checksum of a message is not correct. If you are not sure you got the 
correct length, how can you be sure to find the next record? I would 
always straight fail in all cases. Its to hard for me to understand why 
one would try to continue. I mentioned CRC's because thats the only bad 
pills I ever saw so far. But I am happy that it just stopped and I could 
check what was going on. This will also be invasive in the client code then.


If you ask me, I am always going to vote for "grind to halt" let the 
developers see what happened and let them fix it. It helps building good 
kafka experiences and better software and architectures. For me this is: 
"force the user todo the right thing". 
https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip 
by.  Letting unexpected input slip by is what bought us 15+years of war 
of all sorts of ingestion attacks. I don't even dare to estimate how 
many missingrecords-search-teams going be formed, maybe some hackerone 
for stream apps :D


Best Jan




At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?

Eos will not help the record might be 5,6 repartitions down into the topology. 
I haven't followed but I pray you made EoS optional! We don't need this and we 
don't want this and we will turn it off if it comes. So I wouldn't recommend 
relying on it. The option to turn it off is better than forcing it and still 
beeing unable to rollback badpills (as explained before)

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno



6. Will add an end-to-end example as Michael suggested.

Thanks
Eno




On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:

What I don't understand is this:


 From there on its the easiest way forward: fix, redeploy, start => done

If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you could redeploy with fail-fast again.


Thus, for this pattern, I actually don't see any reason why to stop the
Streams ap

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Eno Thereska
Hi Steven,

You are right in principle. The thing is that what we shipped in Kafka is just 
the low level bare bones that in a sense belong to Kafka. A middle layer that 
keeps track of the data is absolutely needed, and it should hopefully hide the 
distributed system challenges from the end user. Now the question is how should 
such layer look like. I think in all systems there are some basic assumptions 
made about frequency of failures and rebalances just to keep the number of 
retries sane. I agree with you that in principle a rebalance could be always 
happening though.

Eno

> On 6 Jun 2017, at 23:29, Steven Schlansker  wrote:
> 
> 
>> On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:
>> 
>> Steven,
>> 
>> In practice, data shouldn't be migrating that often. If it is then you
>> probably have bigger problems.
> 
> Understood and agreed, but when designing distributed systems, it usually
> helps to model for the worst case rather than the "well that should never
> happen" case, lest you find yourself fixing those bugs at 3am instead :)
> 
> I'd like to be able to induce extreme pain at the Kafka layer (change leader
> every 3 seconds and migrate all partitions around randomly) and still have
> my app behave correctly.
> 
>> You should be able to use the metadata api
>> to find the instance the key should be on and then when you check that node
>> you can also check with the metadata api that the key should still be on
>> this host. If streams is rebalancing while you query an exception will be
>> raised and you'll need to retry the request once the rebalance has
>> completed.
> 
> Agreed here as well.  But let's assume I have a very fast replication
> setup (assume it takes zero time, for the sake of argument) -- I'm fairly
> sure there's still a race here as this exception only fires *during a 
> migration*
> not *after a migration that may have invalidated your metadata lookup 
> completes*
> 
>> 
>> HTH,
>> Damian
>> 
>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
>> wrote:
>> 
>>> 
 On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:
 
 Hi Steven,
 
 Do you know beforehand if a key exists? If you know that and are getting
>>> null() the code will have to retry by refreshing the metadata and going to
>>> the new instance. If you don’t know beforehand if a key exists or not you
>>> might have to check all instances of a store to make sure.
 
>>> 
>>> No, I am not presupposing that the key can exist -- this is a user visible
>>> API and will
>>> be prone to "accidents" :)
>>> 
>>> Thanks for the insight.  I worry that even checking all stores is not
>>> truly sufficient,
>>> as querying different all workers at different times in the presence of
>>> migrating data
>>> can still in theory miss it given pessimal execution.
>>> 
>>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>>> day being
>>> cool like Jepsen :)
>>> 
 Eno
 
 
> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>> sschlans...@opentable.com> wrote:
> 
> Hi everyone, me again :)
> 
> I'm still trying to implement my "remoting" layer that allows
> my clients to see the partitioned Kafka Streams state
> regardless of which instance they hit.  Roughly, my lookup is:
> 
> Message get(Key key) {
> RemoteInstance instance = selectPartition(key);
> return instance.get(key); // http remoting
> }
> 
> RemoteInstance.get(Key key) { // http endpoint
> return readOnlyKeyValueStore.get(key);
> }
> 
> However, the mapping of partitions to instances may change.
> If you call KeyValueStore.get(K) where K is on a partition you
> don't own, it returns null.  This is indistinguishable from a
> successful get on a key that doesn't exist.
> 
> If one instance selects a sibling instance right as the partition is
>>> failing
> off of that instance, it may get routed there and by the time it gets
> the request no longer "owns" the partition -- returns a false 'null'.
> 
> You can try re-checking after you get a null value, but that's
>>> susceptible
> to the same race -- it's unlikely but possible that the data migrates
>>> *back*
> before you do this re-check.
> 
> Is there any way to correctly implement this without races?  I'd imagine
> you need a new primitive like KeyValueStore#get that atomically finds
> the key or throws an exception if it is not in an owned partition
> at the time of lookup so you know to recheck the partition and retry.
> 
> Thoughts?
> 
> Thanks again,
> Steven
> 
 
>>> 
>>> 
> 



Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-07 Thread Eno Thereska
Comments inline:

> On 5 Jun 2017, at 18:19, Jan Filipiak  wrote:
> 
> Hi
> 
> just my few thoughts
> 
> On 05.06.2017 11:44, Eno Thereska wrote:
>> Hi there,
>> 
>> Sorry for the late reply, I was out this past week. Looks like good progress 
>> was made with the discussions either way. Let me recap a couple of points I 
>> saw into one big reply:
>> 
>> 1. Jan mentioned CRC errors. I think this is a good point. As these happen 
>> in Kafka, before Kafka Streams gets a chance to inspect anything, I'd like 
>> to hear the opinion of more Kafka folks like Ismael or Jason on this one. 
>> Currently the documentation is not great with what to do once a CRC check 
>> has failed. From looking at the code, it looks like the client gets a 
>> KafkaException (bubbled up from the fetcher) and currently we in streams 
>> catch this as part of poll() and fail. It might be advantageous to treat CRC 
>> handling in a similar way to serialisation handling (e.g., have the option 
>> to fail/skip). Let's see what the other folks say. Worst-case we can do a 
>> separate KIP for that if it proved too hard to do in one go.
> there is no reasonable way to "skip" a crc error. How can you know the length 
> you read was anything reasonable? you might be completely lost inside your 
> response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?

>> 
>> 
>> At a minimum, handling this type of exception will need to involve the 
>> exactly-once (EoS) logic. We'd still allow the option of failing or 
>> skipping, but EoS would need to clean up by rolling back all the side 
>> effects from the processing so far. Matthias, how does this sound?
> Eos will not help the record might be 5,6 repartitions down into the 
> topology. I haven't followed but I pray you made EoS optional! We don't need 
> this and we don't want this and we will turn it off if it comes. So I 
> wouldn't recommend relying on it. The option to turn it off is better than 
> forcing it and still beeing unable to rollback badpills (as explained before)
>> 

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno


>> 6. Will add an end-to-end example as Michael suggested.
>> 
>> Thanks
>> Eno
>> 
>> 
>> 
>>> On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:
>>> 
>>> What I don't understand is this:
>>> 
 From there on its the easiest way forward: fix, redeploy, start => done
>>> If you have many producers that work fine and a new "bad" producer
>>> starts up and writes bad data into your input topic, your Streams app
>>> dies but all your producers, including the bad one, keep writing.
>>> 
>>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>>> from the topic? It might take some time to identify the root cause and
>>> stop the bad producer. Up to this point you get good and bad data into
>>> your Streams input topic. If Streams app in not able to skip over those
>>> bad records, how would you get all the good data from the topic? Not
>>> saying it's not possible, but it's extra work copying the data with a
>>> new non-Streams consumer-producer-app into a new topic and than feed
>>> your Streams app from this new topic -- you also need to update all your
>>> upstream producers to write to the new topic.
>>> 
>>> Thus, if you want to fail fast, you can still do this. And after you
>>> detected and fixed the bad producer you might just reconfigure your app
>>> to skip bad records until it reaches the good part of the data.
>>> Afterwards, you could redeploy with fail-fast again.
>>> 
>>> 
>>> Thus, for this pattern, I actually don't see any reason why to stop the
>>> Streams app at all. If you have a callback, and use the callback to
>>> raise an alert (and maybe get the bad data into a bad record queue), it
>>> will not take longer to identify and stop the "bad" producer. But for
>>> this case, you have zero downtime for your Streams app.
>>> 
>>> This seems to be much simpler. Or do I miss anything?
>>> 
>>> 
>>> Having said this, I agree that the "threshold based callback" might be
>>> questionable. But as you argue for strict "fail-fast", I want to argue
>>> that this must not always be the best pattern to apply and that the
>>> overall KIP idea is super useful from my point of view.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
 Could not agree more!
 
 But then I think the easiest is still: print exception and die.
 From there on its the easiest way forward: fix, redeploy, start => done
 
 All the other ways to recover

Re: Kafka Streams Failed to rebalance error

2017-06-07 Thread João Peixoto
I'm using Kafka Streams 0.10.2.1 and I still see this error

2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread : Could not create task 0_31. Will
retry.

org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
the state directory for task 0_31
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[kafka-streams-0.10.2.1.jar!/:na]


It has been printing it for hours now, so it does not recover at all.
The most worrying thing is that this stream definition does not even use
state stores, it literally looks like this:

 KStreamBuilder builder = new KStreamBuilder();
KStream kStream =
builder.stream(appOptions.getInput().getTopic());
kStream.process(() -> processor);
new KafkaStreams(builder, streamsConfiguration);

The "processor" does its thing and calls "context().commit()" when done.
That's it. Looking at the actual machine running the instance, the folders
under /tmp/kafka-streams// only have a .lock file.

This seems to have been bootstrapped by the exception:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

We are addressing the latter by reducing "max.poll.records" and increasing "
commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
state dirs if there are no state stores? Since it doesn't seem to do so
automatically, can I configured it somehow to achieve this end?

Additionally, what could lead to it not being able to recover?

On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax 
wrote:

> Great! :)
>
> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > I see now that my Kafka cluster is very stable, and these errors dont
> come
> > now.
> >
> > -Sameer.
> >
> > On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar 
> wrote:
> >
> >> Yes, I have upgraded my cluster and client both to version 10.2.1 and
> >> currently monitoring the situation.
> >> Will report back in case I find any errors. Thanks for the help though.
> >>
> >> -Sameer.
> >>
> >> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Did you see Eno's reply?
> >>>
> >>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
> >>> please report back.
> >>>
> >>> I would also recommend to subscribe to the list. It's self-service
> >>> http://kafka.apache.org/contact
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/3

Re: Async Non Blocking Kafka Producer

2017-06-07 Thread Ankit Jain
Thanks hans.

It would work but producer will start loosing the data even the Cluster is
available.

Thanks
Ankit Jain

On Wed, Jun 7, 2017 at 12:56 PM, Hans Jespersen  wrote:

> Try adding props.put("max.block.ms", "0");
>
> -hans
>
>
>
> > On Jun 7, 2017, at 12:24 PM, Ankit Jain  wrote:
> >
> > Hi,
> >
> > We want to use the non blocking Kafka producer. The producer thread
> should
> > not block if the Kafka is cluster is down or not reachable.
> >
> > Currently, we are setting following properties but the Producer thread is
> > still blocking if the Kafka cluster goes gown or unreachable.
> >
> > * props.put("block.on.buffer.full", "false");*
> > * props.put("acks", "0");*
> >
> > --
> > Thanks
>
>


-- 
Thanks,
Ankit Jain


Re: Async Non Blocking Kafka Producer

2017-06-07 Thread Hans Jespersen
Try adding props.put("max.block.ms", "0");

-hans



> On Jun 7, 2017, at 12:24 PM, Ankit Jain  wrote:
> 
> Hi,
> 
> We want to use the non blocking Kafka producer. The producer thread should
> not block if the Kafka is cluster is down or not reachable.
> 
> Currently, we are setting following properties but the Producer thread is
> still blocking if the Kafka cluster goes gown or unreachable.
> 
> * props.put("block.on.buffer.full", "false");*
> * props.put("acks", "0");*
> 
> -- 
> Thanks



FATAL [mirrormaker-thread-1] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread)

2017-06-07 Thread Fernando Vega
Can someone please help me with this error, this happening after upgrade
from 0.8.2 to 0.10.2.1.
It seem an issue with my consumers but I cannot determine what is happening.


INFO Kafka commitId : e89bffd6b2eff799
(org.apache.kafka.common.utils.AppInfoParser)

[2017-06-07 12:24:45,497] INFO [mirrormaker-thread-0] Starting mirror maker
thread mirrormaker-thread-0 (kafka.tools.MirrorMaker$MirrorMakerThread)

[2017-06-07 12:24:45,497] INFO [mirrormaker-thread-1] Starting mirror maker
thread mirrormaker-thread-1 (kafka.tools.MirrorMaker$MirrorMakerThread)

[2017-06-07 12:24:48,619] INFO Discovered coordinator
app458.sjc2.mytest.com:9092 (id: 2147483613 rack: null) for group
MirrorMaker_hkg1.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-06-07 12:24:48,620] INFO Discovered coordinator
app458.sjc2.mytest.com:9092 (id: 2147483613 rack: null) for group
MirrorMaker_hkg1.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-06-07 12:24:48,625] INFO Revoking previously assigned partitions []
for group MirrorMaker_hkg1
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[2017-06-07 12:24:48,625] INFO Revoking previously assigned partitions []
for group MirrorMaker_hkg1
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[2017-06-07 12:24:48,648] INFO (Re-)joining group MirrorMaker_hkg1
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-06-07 12:24:48,649] INFO (Re-)joining group MirrorMaker_hkg1
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-06-07 12:24:53,560] FATAL [mirrormaker-thread-1] Mirror maker thread
failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)

org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup:
The server experienced an unexpected error when processing the request

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:548)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:521)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)

at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)

at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)

at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

at
kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:625)

at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:431)


Async Non Blocking Kafka Producer

2017-06-07 Thread Ankit Jain
Hi,

We want to use the non blocking Kafka producer. The producer thread should
not block if the Kafka is cluster is down or not reachable.

Currently, we are setting following properties but the Producer thread is
still blocking if the Kafka cluster goes gown or unreachable.

* props.put("block.on.buffer.full", "false");*
* props.put("acks", "0");*

-- 
Thanks


Re: Kafka Over TLS Error - Failed to send SSL Close message - Broken Pipe

2017-06-07 Thread IT Consultant
Hi All ,

Thanks a lot for your help .

A bug has been logged for said issue and can be found at ,

https://issues.apache.org/jira/browse/KAFKA-5401


Thanks again .

On Sun, Jun 4, 2017 at 6:38 PM, Martin Gainty  wrote:

>
> 
> From: IT Consultant <0binarybudd...@gmail.com>
> Sent: Friday, June 2, 2017 11:02 AM
> To: users@kafka.apache.org
> Subject: Kafka Over TLS Error - Failed to send SSL Close message - Broken
> Pipe
>
> Hi All,
>
> I have been seeing below error since three days ,
>
> Can you please help me understand more about this ,
>
>
> WARN Failed to send SSL Close message
> (org.apache.kafka.common.network.SslTransportLayer)
> java.io.IOException: Broken pipe
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>  at
> org.apache.kafka.common.network.SslTransportLayer.
> flush(SslTransportLayer.java:194)
>
> MG>Here is org.apache.kafka.common.network.SslTransportLayer code:
> /**
> * Flushes the buffer to the network, non blocking
> * @param buf ByteBuffer
> * @return boolean true if the buffer has been emptied out, false
> otherwise
> * @throws IOException
> */
> private boolean flush(ByteBuffer buf) throws IOException {
> int remaining = buf.remaining();
> if (remaining > 0) {
> int written = socketChannel.write(buf); //no check for
> isOpen() *socketChannel.isOpen()*
> return written >= remaining;
> }
> return true;
> }
>
> MG>it appears upstream monitor *container* closed connection but kafka
> socketChannel never tested (now-closed) connection with isOpen()
> MG>i think you found a bug
> MG>can you file bug in kafka-jira ?
> https://issues.apache.org/jira/browse/KAFKA/?selectedTab=com.atlassian.
> jira.jira-projects-plugin:summary-panel
> Kafka - ASF JIRA - issues.apache.org issues.apache.org/jira/browse/KAFKA/?selectedTab=com.
> atlassian.jira.jira-projects-plugin:summary-panel>
> issues.apache.org
> Atlassian JIRA Project Management Software (v6.3.15#6346-sha1:dbc023d)
> About JIRA; Report a problem; Powered by a free Atlassian JIRA open source
> license for Apache ...
>
>
>
>
>  at
> org.apache.kafka.common.network.SslTransportLayer.
> close(SslTransportLayer.java:148)
>  at
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
>  at
> org.apache.kafka.common.network.Selector.close(Selector.java:442)
>  at org.apache.kafka.common.network.Selector.poll(
> Selector.java:310)
>  at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>  at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>  at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>  at java.lang.Thread.run(Thread.java:745)
>
>
> Thanks  a lot.
>


Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-07 Thread Steven Schlansker
Thank you for the idea, I'll keep that in mind if I run into limitations of
my current approach.

> On Jun 6, 2017, at 5:50 PM, Guozhang Wang  wrote:
> 
> Thanks Steven, interesting use case.
> 
> The current streams state store metadata discovery is assuming the
> `DefaultStreamPartitioner` is used, which is a limitation for such cases.
> 
> Another workaround that I can think of is, that you can first partition on
> D in the first stage to let the workers to the "real" work, then you can
> pipe it to a second stage where you re-partition on K, and the second
> processor is only for materializing the store for querying. I'm not sure if
> it would be better since it may require doubling the store spaces (one on
> the first processor and one on the second), and since you can hold the
> whole K -> D map in a global state it seems this map is small enough so
> maybe not worth the repartitioning.
> 
> 
> Guozhang
> 
> 
> 
> 
> 
> 
> On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll  wrote:
> 
>> Happy to hear you found a working solution, Steven!
>> 
>> -Michael
>> 
>> 
>> 
>> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
>> sschlans...@opentable.com> wrote:
>> 
 
 On Jun 2, 2017, at 3:32 PM, Matthias J. Sax 
>>> wrote:
 
 Thanks. That helps to understand the use case better.
 
 Rephrase to make sure I understood it correctly:
 
 1) you are providing a custom partitioner to Streams that is base on
>> one
 field in your value (that's fine with regard to fault-tolerance :))
 2) you want to use interactive queries to query the store
 3) because of your custom partitioning schema, you need to manually
 figure out the right application instance that hosts a key
 4) thus, you use a GlobalKTable to maintain the information from K to D
 and thus to the partition ie, streams instance that hosts K
 
 If this is correct, than you cannot use the "by key" metadata
>> interface.
 It's designed to find the streams instance base in the key only -- but
 your partitioner is based on the value. Internally, we call
 
> final Integer partition = partitioner.partition(key, null,
>>> sourceTopicsInfo.maxPartitions);
 
 Note, that `value==null` -- at this point, we don't have any value
 available and can't provide it to the partitioner.
 
 Thus, your approach to get all metadata is the only way you can go.
>>> 
>>> Thanks for confirming this.  The code is a little ugly but I've done
>> worse
>>> :)
>>> 
 
 
 Very interesting (and quite special) use case. :)
 
 
 -Matthias
 
 On 6/2/17 2:32 PM, Steven Schlansker wrote:
> 
>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax 
>>> wrote:
>> 
>> I am not sure if I understand the use case correctly. Could you give
>> some more context?
> 
> Happily, thanks for thinking about this!
> 
>> 
>>> backing store whose partitioning is value dependent
>> 
>> In infer that you are using a custom store and not default RocksDB?
>> If
>> yes, what do you use? What does "value dependent" mean in this
>> context?
> 
> We're currently using the base in memory store.  We tried to use
>> RocksDB
> but the tuning to get it running appropriately in a Linux container
>>> without
> tripping the cgroups OOM killer is nontrivial.
> 
> 
>> Right now, I am wondering, why you not just set a new key to get your
>> data grouped by the field you are interesting in? Also, if you don't
>> partitioned your data by key, you might break your streams
>> application
>> with regard to fault-tolerance -- or does your custom store not rely
>> on
>> changelog backup for fault-tolerance?
>> 
> 
> That's an interesting point about making transformed key.  But I don't
>>> think
> it simplifies my problem too much.  Essentially, I have a list of
>>> messages
> that should get delivered to destinations.  Each message has a primary
>>> key K
> and a destination D.
> 
> We partition over D so that all messages to the same destination are
>>> handled by
> the same worker, to preserve ordering and implement local rate limits
>>> etc.
> 
> I want to preserve the illusion to the client that they can look up a
>>> key with
> only K.  So, as an intermediate step, we use the GlobalKTable to look
>>> up D.  Once
> we have K,D we can then compute the partition and execute a lookup.
> 
> Transforming the key to be a composite K,D isn't helpful because the
>>> end user still
> only knows K -- D's relevance is an implementation detail I wish to
>>> hide -- so you still
> need some sort of secondary lookup.
> 
> We do use the changelog backup for fault tolerance -- how would having
>>> the partition
> based on the value break this?  Is the changelog implicitly
>> partitioned
>>> by a partitioner
> other than the one we give to the topology?
>

Re: kafka streams changelog topic value

2017-06-07 Thread john cheng
the right way to see changelog persistent by rocksdb is use ByteDeser, and
then decode hex to string

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");

for(ConsumerRecord record: consumerRecords) {

  print bytesToHexString(record.value())

}

public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}

output now collect

p:0,o:0,k:msg1,v:0001
p:0,o:1,k:msg3,v:0001
p:0,o:2,k:msg5,v:0001
p:0,o:3,k:msg1,v:0002
p:0,o:4,k:msg3,v:0002
p:1,o:0,k:msg2,v:0001
p:1,o:1,k:msg4,v:0001
p:1,o:2,k:msg2,v:0002
p:1,o:3,k:msg2,v:0003


2017-06-07 18:42 GMT+08:00 john cheng :

> I add some log on StoreChangeLog
>
> for (K k : this.dirty) {
> V v = getter.get(k);
> log.info("logChange key:{},value:{}", k, v);
> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
> keySerializer, valueSerializer);
> }
>
> and found the print result is normal, just some byte:
>
> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] 
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] 
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>
> but console-consumer still output kafka:
>
> so this may be problem of console-consume.
>
>
> 2017-06-07 18:24 GMT+08:00 john cheng :
>
>> I'm running WordCountProcessorDemo with Processor API. and change
>> something below
>> 1. config 1 stream-thread and 1 replicas
>> 2. change inMemory() to persistent()
>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>> msg output by console-consumer
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic streams-input2
>> msg1  # by console-producer, we can only produce message's value. so
>> message produce to input topic will use roundrobbin partition
>> msg2
>> msg3
>> msg4
>> msg5
>> msg6
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> null:msg2  # key is null, value is what we produce above
>> null:msg4
>> null:msg6
>> null:msg1
>> null:msg3
>> null:msg5
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> msg2:1
>> msg1:1
>> msg1:1
>> msg3:1
>> msg2:1
>> msg4:1
>> msg1:1
>> msg3:1
>> msg5:1
>> msg2:1
>> msg4:1
>> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>> --property print.key=true --property key.separator=":" --from-beginning
>> *msg2:*
>> *msg4:*
>> *msg6:*
>> *msg1:*
>> *msg3:*
>> *msg5:*
>> Everything is ok, except changelog-topic trouble me. Why it's value is
>> empty?
>>
>> I have dig into source code, and summary the workflow :
>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>> no matter Memory or RocksDB, they both put into local storage, then append
>> msg to StoreChangeLogger
>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>> by passing ValueGetter, this getter will get value from local storage when
>> logChange() operation happen
>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>> changelog topic, here is  streams-wordcount-Counts-changelog
>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>> and add().
>>
>> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
>> V v = getter.get(k);  // value getter will get value from local storage
>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
>> keySerializer, valueSerializer);
>> }
>>
>>
>> The Only reason I can figure out why value is empty is when invoke
>> KeyValueStore's delete method. But Actualy the application did't do it
>>
>> Anyone help me, Tks .
>>
>>
>


Re: Cluster in weird state: no leaders no ISR for all topics, but it works!

2017-06-07 Thread Del Barrio, Alberto
Hi,

I have followed the instructions you detail and I could create topics,
which were getting a leader and were properly replicated.
I think the problem I experienced was due to some old temporary
communication problems between Kafka and Zookeeper. But that's only a guess.

Thanks a lot Mohammed for your time.

Alberto del Barrio.

On 5 June 2017 at 10:38, Mohammed Manna  wrote:

> Hi,
>
> I setup a fresh cluster (3-brokers, 3-keepers) and created a topic
> according to your settings - obviously the log directories are kept
> separeate e.g. (var/lib/zookeeper2 and var/lib/zookeeper3) and not to
> mention, the myid files for every zookeeper to identify themselves in the
> ensemble. Cannot see any issues.
>
> Do you mind trying it out with a fresh quorum information - please do the
> following:
>
> 1) Delete the zookeeper logs from the log and dataLogDir locations
> 2) in the zookeeper.properties for Kafka ensure that the zookeeper config
> specifies different log directories for every zookeeper.
> 3) Restart the cluster in the following pattern:
> a) Start your zookeepers and allow ~5s waiting time between each,
> b) Start your kafka brokers and allow ~5s waiting time between each
> other.
>
> 4) Create a topic using console utility and do a --describe on the topic -
>
>
> let us know.
>
> KR,
>
>
>
> On 2 June 2017 at 13:55, Del Barrio, Alberto <
> alberto.delbar...@360dialog.com> wrote:
>
> > So, I fixed the problem doing a rolling restart, and after some checks
> > seems there was no data loss.
> >
> > On 1 June 2017 at 17:57, Del Barrio, Alberto <
> > alberto.delbar...@360dialog.com> wrote:
> >
> > > I might give it a try tomorrow. The reason for having so large init and
> > > sync limit times is because in the past our ZK cluster was storing
> large
> > > amount of data, and lower values were not enough for the server syncs
> > when
> > > restarting zk processes.
> > >
> > > On 1 June 2017 at 17:52, Mohammed Manna  wrote:
> > >
> > >> Cool - I will try and take a look into this - Meanwhile, do you mind
> > >> awfuly
> > >> to change the following and see if things improve?
> > >>
> > >> tickTime = 1000
> > >> initLimit=3
> > >> syncLimit=5
> > >>
> > >> On 1 June 2017 at 16:49, Del Barrio, Alberto <
> > >> alberto.delbar...@360dialog.com> wrote:
> > >>
> > >> > Here are the configs you were asking for:
> > >> >
> > >> > Zookeeper:
> > >> > tickTime=1000
> > >> > initLimit=2000
> > >> > syncLimit=1000
> > >> > dataDir=/var/lib/zookeeper
> > >> > clientPort=2181
> > >> > server.3=10.0.0.3:2888:3888
> > >> > server.2=10.0.0.2:2888:3888
> > >> > server.1=10.0.0.1:2888:3888
> > >> >
> > >> >
> > >> > Kafka broker (for one of them):
> > >> > broker.id=10
> > >> > listeners=PLAINTEXT://10.0.0.4:9092
> > >> > num.network.threads=3
> > >> > num.io.threads=8
> > >> > socket.send.buffer.bytes=102400
> > >> > socket.receive.buffer.bytes=102400
> > >> > socket.request.max.bytes=104857600
> > >> > log.dirs=/var/lib/kafka
> > >> > num.partitions=2
> > >> > num.recovery.threads.per.data.dir=1
> > >> > zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181/kafka
> > >> > zookeeper.connection.timeout.ms=6000
> > >> >
> > >> > In general they're pretty much the default ones.
> > >> > I can see in Zookeeper the kafka brokers connected to it and
> > exchanging
> > >> > data...
> > >> >
> > >> > Thanks for your help and time.
> > >> >
> > >> > On 1 June 2017 at 17:32, Mohammed Manna  wrote:
> > >> >
> > >> > > Could you please share your broker/zookeeper/topic configs ?
> > >> > >
> > >> > > On 1 June 2017 at 16:18, Del Barrio, Alberto <
> > >> > > alberto.delbar...@360dialog.com> wrote:
> > >> > >
> > >> > > > I tried creating the topic and results are very similar to the
> > >> current
> > >> > > > situation: there are not ISR and no leader for any of the
> > >> partitions,
> > >> > but
> > >> > > > now kafka-topics shows *Leader: none* when for all the other
> > >> topics, it
> > >> > > > shows *Leader: -1*
> > >> > > >
> > >> > > >
> > >> > > > On 1 June 2017 at 17:05, Mohammed Manna 
> > wrote:
> > >> > > >
> > >> > > > > I had a similar situation, but only 1 of my ZKs was
> struggling -
> > >> but
> > >> > > > since
> > >> > > > > the ISR synching time is configurable I was confident to
> bounce
> > 1
> > >> ZK
> > >> > > at a
> > >> > > > > time and it worked out.
> > >> > > > > does it happen even when you create a new topic with a
> > >> > > > > replication:partition ration of 1?
> > >> > > > >
> > >> > > > > i meant, 3 replicas, 3 partitions :)
> > >> > > > >
> > >> > > > > On 1 June 2017 at 15:58, Del Barrio, Alberto <
> > >> > > > > alberto.delbar...@360dialog.com> wrote:
> > >> > > > >
> > >> > > > > > Hi Mohammed,
> > >> > > > > >
> > >> > > > > > thanks for your answer.
> > >> > > > > > The ZK cluster is not located in the servers where Kafka
> runs
> > >> but
> > >> > in
> > >> > > > > other
> > >> > > > > > 3 different machines. This ZK cluster is used by several
> other
> > >> > > services
> 

Re: Debugging Kafka Streams Windowing

2017-06-07 Thread Garrett Barton
Mahendra,

 Did increasing those two properties do the trick?  I am running into this
exact issue testing streams out on a single Kafka instance.  Yet I can
manually start a consumer and read the topics fine while its busy doing
this dead stuffs.

On Tue, May 23, 2017 at 12:30 AM, Mahendra Kariya <
mahendra.kar...@go-jek.com> wrote:

> On 22 May 2017 at 16:09, Guozhang Wang  wrote:
>
> > For
> > that issue I'd suspect that there is a network issue, or maybe the
> network
> > is just saturated already and the heartbeat request / response were not
> > exchanged in time between the consumer and the broker, or the sockets
> being
> > dropped because of socket limit. Under this cases not all consumers may
> be
> > affected, but since the associated issue is from "AbstractCoordinator"
> > class which is part of the consumer client, I'd still be surprised if it
> is
> > actually due to Streams itself with the same consumer config settings,
> but
> > not to consumers.
> >
>
> Yes. This is the conclusion that even we are coming to after further
> investigation. But didn't want to post it here until we were sure.
>
> We are experimenting with increasing the default timeouts, particularly
> hearbeat.interval.ms and session.timeout.ms. So far, the things have been
> running fine. But we will let it run for a few more days before closing
> this issue.
>


Re: kafka streams changelog topic value

2017-06-07 Thread john cheng
I add some log on StoreChangeLog

for (K k : this.dirty) {
V v = getter.get(k);
log.info("logChange key:{},value:{}", k, v);
collector.send(new ProducerRecord<>(this.topic, this.partition, k,
v), keySerializer, valueSerializer);
}

and found the print result is normal, just some byte:

[2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka]
(org.apache.kafka.streams.state.internals.StoreChangeLogger)
[2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2]
(org.apache.kafka.streams.state.internals.StoreChangeLogger)

but console-consumer still output kafka:

so this may be problem of console-consume.


2017-06-07 18:24 GMT+08:00 john cheng :

> I'm running WordCountProcessorDemo with Processor API. and change
> something below
> 1. config 1 stream-thread and 1 replicas
> 2. change inMemory() to persistent()
> MyKakfa version is 0.10.0.0. After running streaming application, I check
> msg output by console-consumer
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic streams-input2
> msg1  # by console-producer, we can only produce message's value. so
> message produce to input topic will use roundrobbin partition
> msg2
> msg3
> msg4
> msg5
> msg6
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-input2 --property
> print.key=true --property key.separator=":" --from-beginning
> null:msg2  # key is null, value is what we produce above
> null:msg4
> null:msg6
> null:msg1
> null:msg3
> null:msg5
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-output2 --property
> print.key=true --property key.separator=":" --from-beginning
> msg2:1
> msg1:1
> msg1:1
> msg3:1
> msg2:1
> msg4:1
> msg1:1
> msg3:1
> msg5:1
> msg2:1
> msg4:1
> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
> --property print.key=true --property key.separator=":" --from-beginning
> *msg2:*
> *msg4:*
> *msg6:*
> *msg1:*
> *msg3:*
> *msg5:*
> Everything is ok, except changelog-topic trouble me. Why it's value is
> empty?
>
> I have dig into source code, and summary the workflow :
> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no
> matter Memory or RocksDB, they both put into local storage, then append msg
> to StoreChangeLogger
> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by
> passing ValueGetter, this getter will get value from local storage when
> logChange() operation happen
> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
> changelog topic, here is  streams-wordcount-Counts-changelog
> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
> and add().
>
> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
> V v = getter.get(k);  // value getter will get value from local storage
> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
> keySerializer, valueSerializer);
> }
>
>
> The Only reason I can figure out why value is empty is when invoke
> KeyValueStore's delete method. But Actualy the application did't do it
>
> Anyone help me, Tks .
>
>


kafka streams changelog topic value

2017-06-07 Thread john cheng
I'm running WordCountProcessorDemo with Processor API. and change something
below
1. config 1 stream-thread and 1 replicas
2. change inMemory() to persistent()
MyKakfa version is 0.10.0.0. After running streaming application, I check
msg output by console-consumer
➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic streams-input2
msg1  # by console-producer, we can only produce message's value. so
message produce to input topic will use roundrobbin partition
msg2
msg3
msg4
msg5
msg6
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic streams-input2 --property
print.key=true --property key.separator=":" --from-beginning
null:msg2  # key is null, value is what we produce above
null:msg4
null:msg6
null:msg1
null:msg3
null:msg5
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic streams-output2 --property
print.key=true --property key.separator=":" --from-beginning
msg2:1
msg1:1
msg1:1
msg3:1
msg2:1
msg4:1
msg1:1
msg3:1
msg5:1
msg2:1
msg4:1
msg6:1  # due to log compaction, same key will be overwrite. this is ok...
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic
streams-wordcount-Counts-changelog --property print.key=true --property
key.separator=":" --from-beginning
*msg2:*
*msg4:*
*msg6:*
*msg1:*
*msg3:*
*msg5:*
Everything is ok, except changelog-topic trouble me. Why it's value is
empty?

I have dig into source code, and summary the workflow :
1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no
matter Memory or RocksDB, they both put into local storage, then append msg
to StoreChangeLogger
2. the key append to StoreChangeLogger first, and invoke maybeLogChange by
passing ValueGetter, this getter will get value from local storage when
logChange() operation happen
3. when logChange() on StoreChangeLogger happen, send KeyValue message to
changelog topic, here is  streams-wordcount-Counts-changelog
4. StoreChangeLogger use dirty and remove Set to swap between logChange()
and add().

for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
V v = getter.get(k);  // value getter will get value from local storage
collector.send(new ProducerRecord<>(this.topic, this.partition, k,
v), keySerializer, valueSerializer);
}


The Only reason I can figure out why value is empty is when invoke
KeyValueStore's delete method. But Actualy the application did't do it

Anyone help me, Tks .


Re: Losing messages in Kafka Streams after upgrading

2017-06-07 Thread Frank Lyaruu
I tried to use a TimestampExtractor that uses our timestamps from the
messages, and use a 'map' operation on the KTable to set it to current, to
have a precise point where I discard our original timestamps. That does not
work, (I verified by writing a separate java Kafka Consumer and spit out
the timestamps) as the TimestampExtractor only gets called once, and it
will stick with that time. I did not really have a good reason not to
simply use the WallclockTimeExtractor, and that one seems to do exactly
what I wanted.

So, I'm good! I am interested in the community discussion Guozhang
mentions. Is there a KIP for that?

regards, Frank


On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax 
wrote:

> Frank,
>
> If you use "now", I assume you are calling System.currentTimeMillis().
> If yes, you can also use predefined WallclockTimestampExtractor that
> ships with Streams (no need to write your own one).
>
> > I thought that the Timestamp extractor would then also use
> >> that updated timestamp as 'stream time', but I don't really see that
> >> happening, so that assumption was wrong.
>
> Yes, this should happen. Not sure why you don't observe this. And thus,
> the producer should use this timestamp to write the records.
>
> How did you verify the timestamps that are set for your output records?
>
>
> -Matthias
>
>
> On 6/5/17 6:15 AM, Frank Lyaruu wrote:
> > Thanks Guozhang,
> >
> > I figured I could use a custom timestamp extractor, and set that
> timestamp
> > to 'now' when reading a source topic, as the original timestamp is pretty
> > much irrelevant. I thought that the Timestamp extractor would then also
> use
> > that updated timestamp as 'stream time', but I don't really see that
> > happening, so that assumption was wrong.
> >
> > If I could configure a timestamp extractor that would also be used by the
> > producer I think I'd be in business, but right now I don't see an elegant
> > way forward, so any ideas for work arounds are welcome.
> >
> > regards, Frank
> >
> > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang 
> wrote:
> >
> >> Frank, thanks for sharing with your findings.
> >>
> >> I think this is a general issue to consider in Streams, and the
> community
> >> has been thinking about it: we write intermediate topics with the stream
> >> time that is inherited from the source topic's timestamps, however that
> >> timestamp is used for log rolling / retention etc as well, and these two
> >> purposes (use timestamps in processing for out-of-ordering and late
> >> arrivals, and operations on the Kafka topics) could rely on different
> >> timestamp semantics. We need to revisit on timestamps can be maintained
> >> across the topology in Streams.
> >>
> >> Guozhang
> >>
> >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu 
> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Ok, that clarifies quite a bit. I never really went into the timestamp
> >>> aspects, as time does not really play a role in my application (aside
> >> from
> >>> the repartition topics, I have no KStreams or Windowed operation, just
> >>> different kind of KTable join).
> >>>
> >>> I do think that the fail case I see (With this version joining two
> 'old'
> >>> KTables causes a small percentage of records to vanish) is far from
> >>> intuitive, and it somehow worked fine until a few weeks ago.
> >>>
> >>> I think your option 3 should work. I'll make a custom timestamp
> extractor
> >>> (I actually do have a timestamp in my messages), and I'll set it to the
> >>> current time as they enter the streams application.
> >>>
> >>> Thanks, that helped, regards, Frank
> >>>
> >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax  >
> >>> wrote:
> >>>
>  Hi Frank,
> 
>  yes, retention policy is based on the embedded record timestamps and
> >> not
>  on system time. Thus, if you send messages with an old timestamp, they
>  can trigger log/segment rolling.
> 
> >> I see that the repartition topics have timestamp.type = CreateTime,
> >>> does
> >> that mean it uses the timestamp of the
> >> original message?
> 
>  Yes. That's the default setting on the broker side. For Streams, we
>  maintain a so-called "stream time" that is computed based on the input
>  record timestamps. This "stream time" is used to set the timestamp for
>  records that are written by Stream. (so it's more or less the
> timestamp
>  of the input records).
> 
> >> Shouldn't that be LogAppendTime for repartition topics?
> 
>  No. Streams needs to preserve the original timestamp to guaranteed
>  correct semantics for downstream window operations. Thus, it should be
>  CreateTime -- if you switch to LogAppendTime, you might break your
>  application logic and get wrong results.
> 
> >> Or is there a way to configure that?
> 
>  You can configure this on a per topic basis on the brokers.
> 
> >> If I hack into my Kafka streams code to force it to use
> >> LogAppendTime
>  se

ACLs for regular expression

2017-06-07 Thread Ashish Bhushan
Hi,

Is it possible to give ACLs for a regular expression for group names ?

For example ..I want to give Read access for all group names with prefix
DNS*


Thanks