Kafka Cluster fail to start if one instance of zookeeper not up

2017-03-06 Thread safique ahemad
Hi All,

Below are two scenarios that are failing
1) 3 node zookeeper cluster and 3 node Kafka cluster: One of zookeeper node
was down due to some reason when Kafka cluster started, none of its stances
came up. All are failing with connectivity to down zookeeper instance.
Whereas, Kafka contains zookeeper host port as list of all the zookeepers.

1) 5 node zookeeper cluster and 3 node Kafka cluster: same behavior as
above.

I was expecting, Kafka cluster should come up. because 3 zookeeper cluster
can tolerate 1 failure and 5 zookeeper cluster 2 failure. If one zookeeper
instance is not up then that should not make a difference.

I am not sure, if is right behavior?

Please, suggest.

-- 

Regards,
Safique Ahemad


Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Sachin Mittal
> As for the second issue you brought up, I agree it is indeed a bug; but
just to clarify it is the CREATION of the first task including restoring
stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
processing it right

Yes this is correct. I may have misused the terminology so lets not confuse
with processing in terms of kafka streams.

> From my understanding, the deadlock is "caused" by your fix of problem
one. If the thread would die, the lock would get release and no deadlock
would occur.

Well actually deadlock issue is different from CommitFailedException and is
not caused by it. However if we fix this, it may potentially cause
CommitFailedException later on, but in trunk we have already fixed that
CommitFailedExceptio, so only issue left is this deadlock issue.
I actually did a single commit for both the issue, so it may have got
confusing that fix of one causes second, but they are essentially unrelated.

Deadlock issue is simply if CREATION of the first task including restoring
stores takes longer than MAX_POLL_INTERVAL_MS_CONFIG then the second task
in that pipeline may go into deadlock state if some other thread has
already got the handle of that partition. So as per me we may need some
upper bound check for backoffTimeMs .

Thanks
Sachin



On Tue, Mar 7, 2017 at 3:24 AM, Matthias J. Sax 
wrote:

> Thanks for your input. I now understood the first issue (and the fix).
> Still not sure about the second issue.
>
> From my understanding, the deadlock is "caused" by your fix of problem
> one. If the thread would die, the lock would get release and no deadlock
> would occur.
>
> However, because the thread does not die, it also does not release its
> locks. Thus, from my understanding, in case of a CommitFailedException,
> the thread must release its locks, too. This would resolve the deadlock
> -- we don't need any timeout for this.
>
> Or do I still miss understand the issue?
>
>
> -Matthias
>
>
> On 3/6/17 11:41 AM, Guozhang Wang wrote:
> > Hello Sachin,
> >
> > Thanks for your finds!! Just to add what Damian said regarding 1), in
> > KIP-129 where we are introducing exactly-once processing semantics to
> > Streams we have also described different categories of error handling for
> > exactly-once. Commit exceptions due to rebalance will be handled as
> > "producer fenced" which will not be thrown to the users as we already did
> > in trunk.
> >
> > As for the second issue you brought up, I agree it is indeed a bug; but
> > just to clarify it is the CREATION of the first task including restoring
> > stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> > processing it right (we are only start processing until all tasks have
> been
> > created and initialized)?
> >
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal 
> wrote:
> >
> >> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
> >>
> >>
> >> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy 
> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> If it is a bug then please file a JIRA for it, too.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal  wrote:
> >>>
>  Ok that's great.
>  So you have already fixed that issue.
> 
>  I have modified my PR to remove that change (which was done keeping
>  0.10.2.0 in mind).
> 
>  However the other issue is still valid.
> 
>  Please review that change. https://github.com/apache/kafka/pull/2642
> 
> 
>  Thanks
>  Sachin
> 
> 
>  On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy 
> >> wrote:
> 
> > On trunk the CommitFailedException isn't thrown anymore. The
>  commitOffsets
> > method doesn't throw an exception. It returns one if it was thrown.
> >> We
>  used
> > to throw this exception during suspendTasksAndState, but we don't
>  anymore.
> >
> > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal 
> >> wrote:
> >
> >> Hi
> >> On CommitFailedException at onPartitionsRevoked if it is thrown it
> >>> gets
> >> assigned to rebalanceException.
> >> This causes the stream thread to shutdown. I am not sure how we can
> > resume
> >> the thread.
> >>
> >> Note thread is not in invalid state because because it already has
> >>> been
> >> assigned new partitions and this exception happens when trying to
>  revoke
> >> old partitions which have been moved to some other thread, so we
> >> need
>  to
> >> swallow this exception at the StreanThread side too, just like we
>  swallow
> >> it at ConsumerCoordinator.java
> >>
> >> Also I fixed this against code base 0.10.2.0 and the difference in
> >>> that
> > vs
> >> trunk code is these lines
> >> 10.2.0
> >>if (firstException.get() == null) {
> >> 

Re: Performance and encryption

2017-03-06 Thread Ismael Juma
Hi Todd,

I agree that KAFKA-2561 would be good to have for the reasons you state.

Ismael

On Mon, Mar 6, 2017 at 5:17 PM, Todd Palino  wrote:

> Thanks for the link, Ismael. I had thought that the most recent kernels
> already implemented this, but I was probably confusing it with BSD. Most of
> my systems are stuck in the stone age right now anyway.
>
> It would be nice to get KAFKA-2561 in, either way. First off, if you can
> take advantage of it it’s a good performance boost. Second, especially with
> the security landscape getting worse and worse, it would be good to have
> options as far as the TLS implementation goes. A zero-day exploit in the
> Java TLS implementation would be devastating, and more difficult to react
> to as it would require a new JRE (bringing with it who knows what
> problems). Swapping an underlying OpenSSL version would be much more
> palatable.
>
> -Todd
>
>
> On Mon, Mar 6, 2017 at 9:01 AM, Ismael Juma  wrote:
>
> > Even though OpenSSL is much faster than the Java 8 TLS implementation (I
> > haven't tested against Java 9, which is much faster than Java 8, but
> > probably still slower than OpenSSL), all the tests were without zero copy
> > in the sense that is being discussed here (i.e. sendfile). To benefit
> from
> > sendfile with TLS, kernel-level changes/modules are required:
> >
> > https://github.com/ktls/af_ktls
> > http://www.phoronix.com/scan.php?page=news_item=FreeBSD-
> Faster-Sendfile
> >
> > Ismael
> >
> > On Mon, Mar 6, 2017 at 4:18 PM, Todd Palino  wrote:
> >
> > > So that’s not quite true, Hans. First, as far as the performance hit
> > being
> > > not a big impact (25% is huge). Or that it’s to be expected. Part of
> the
> > > problem is that the Java TLS implementation does not support zero copy.
> > > OpenSSL does, and in fact there’s been a ticket open to allow Kafka to
> > > support using OpenSSL for a while now:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-2561
> > >
> > >
> > >
> > >
> > > On Mon, Mar 6, 2017 at 6:30 AM, Hans Jespersen 
> > wrote:
> > >
> > > >
> > > > Its not a single message at a time that is encrypted with TLS its the
> > > > entire network byte stream so a Kafka broker can’t even see the Kafka
> > > > Protocol tunneled inside TLS unless it’s terminated at the broker.
> > > > It is true that losing the zero copy optimization impacts performance
> > > > somewhat  but it’s not what I would call a “big impact” because Kafka
> > > does
> > > > a lot of other things to get it’s performance (like using page cache
> > and
> > > > doing lots on sequential disk I/O). The difference should be
> something
> > in
> > > > the order of 25-30% slower with TLS enabled which is about what you
> > would
> > > > see with any other messaging protocol with TLS on vs off.
> > > >
> > > > If you wanted to encrypt each message independently before sending to
> > > > Kafka then zero copy would still be in effect and all the consumers
> > would
> > > > get the same encrypted message (and have to understand how to decrypt
> > > it).
> > > >
> > > > -hans
> > > >
> > > >
> > > >
> > > > > On Mar 6, 2017, at 5:38 AM, Nicolas Motte 
> > > wrote:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > I understand one of the reasons why Kafka is performant is by using
> > > > > zero-copy.
> > > > >
> > > > > I often hear that when encryption is enabled, then Kafka has to
> copy
> > > the
> > > > > data in user space to decode the message, so it has a big impact on
> > > > > performance.
> > > > >
> > > > > If it is true, I don t get why the message has to be decoded by
> > Kafka.
> > > I
> > > > > would assume that whether the message is encrypted or not, Kafka
> > simply
> > > > > receives it, appends it to the file, and when a consumer wants to
> > read
> > > > it,
> > > > > it simply reads at the right offset...
> > > > >
> > > > > Also I m wondering if it s the case if we don t use keys (pure
> > queuing
> > > > > system with key=null).
> > > > >
> > > > > Cheers
> > > > > Nico
> > > >
> > > >
> > >
> > >
> > > --
> > > *Todd Palino*
> > > Staff Site Reliability Engineer
> > > Data Infrastructure Streaming
> > >
> > >
> > >
> > > linkedin.com/in/toddpalino
> > >
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Clarification on min.insync.replicas​

2017-03-06 Thread Todd Palino
Default broker configurations do not show in the topic overrides (which is
what you are showing with the topics tool). It is more accurate to say that
the min.insync.replicas setting in your server.properties file is what will
apply to every topic (regardless of when it is created), if there exists no
topic override for that configuration for that config.

-Todd


On Mon, Mar 6, 2017 at 4:38 PM, Shrikant Patel  wrote:

> Hi All,
>
> Need details about min.insync.replicas​ in the server.properties.
>
> I thought once I add this to server.properties, all subsequent topic
> create should have this as default value.
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat
> --zookeeper localhost:2181/chroot/cluster1 --create --topic test
> --partition 3 --replication-factor 3
> Created topic "test".
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat
> --zookeeper localhost:2181/chroot/cluster1 --describe --topic test
> Topic:test  PartitionCount:3ReplicationFactor:3 Configs:
>
> No min.insync.replicas is set on the topic.
>
> Why do I have explicit provide this configuratoin when creating topic? So
> whats the purpose of this in server.properties??
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat
> --zookeeper localhost:2181/chroot/cluster1 --create --topic test
> --partition 3 --replication-factor 3 --config min.insync.replicas=3
> Created topic "test".
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat
> --zookeeper localhost:2181/chroot/cluster1 --describe --topic test
> Topic:test  PartitionCount:3ReplicationFactor:3
>  Configs:min.insync.replicas=3
> Topic: test Partition: 0Leader: 1   Replicas: 1,2,0
> Isr: 1,2,0
> Topic: test Partition: 1Leader: 2   Replicas: 2,0,1
> Isr: 2,0,1
> Topic: test Partition: 2Leader: 0   Replicas: 0,1,2
> Isr: 0,1,2
>
> Thanks
> Shri
>
>
>
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Clarification on min.insync.replicas​

2017-03-06 Thread Shrikant Patel
Hi All,

Need details about min.insync.replicas​ in the server.properties.

I thought once I add this to server.properties, all subsequent topic create 
should have this as default value.

C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat 
--zookeeper localhost:2181/chroot/cluster1 --create --topic test --partition 3 
--replication-factor 3
Created topic "test".

C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat 
--zookeeper localhost:2181/chroot/cluster1 --describe --topic test
Topic:test  PartitionCount:3ReplicationFactor:3 Configs:

No min.insync.replicas is set on the topic.

Why do I have explicit provide this configuratoin when creating topic? So whats 
the purpose of this in server.properties??

C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat 
--zookeeper localhost:2181/chroot/cluster1 --create --topic test --partition 3 
--replication-factor 3 --config min.insync.replicas=3
Created topic "test".

C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topics.bat 
--zookeeper localhost:2181/chroot/cluster1 --describe --topic test
Topic:test  PartitionCount:3ReplicationFactor:3 
Configs:min.insync.replicas=3
Topic: test Partition: 0Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0
Topic: test Partition: 1Leader: 2   Replicas: 2,0,1 Isr: 
2,0,1
Topic: test Partition: 2Leader: 0   Replicas: 0,1,2 Isr: 
0,1,2

Thanks
Shri



This e-mail and its contents (to include attachments) are the property of 
National Health Systems, Inc., its subsidiaries and affiliates, including but 
not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, 
and may contain confidential and proprietary or privileged information. If you 
are not the intended recipient of this e-mail, you are hereby notified that any 
unauthorized disclosure, copying, or distribution of this e-mail or of its 
attachments, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, please immediately notify the 
sender by telephone at 800-433-5719 or return e-mail and permanently delete the 
original e-mail.


Re: Can I create user defined stream processing function/api?

2017-03-06 Thread Matthias J. Sax
Hi,

you can implements custom operator via process(), transform(), and
transform() values.

Also, if you want to have even more control over the topology, you can
use low-level Processor API directly instead of DSL.

http://docs.confluent.io/current/streams/developer-guide.html#processor-api


-Matthias

On 3/6/17 3:33 PM, Wang LongTian wrote:
> Dear folks,
> 
> Background: I'm leaning Kafka stream and want to use that in my product for 
> real time streaming process with data from various sensors.
> 
> Question: 
> 1. Can I define my own processing function/api in Kafka stream except the 
> predefined functions like groupby(), count() etc.?
> 2. If I could define my own function, could you please specify a example or 
> introduce me a github project for example?
> 
> Thank u in advance!
> 
> Regards,
> Long Tian
> 



signature.asc
Description: OpenPGP digital signature


Can I create user defined stream processing function/api?

2017-03-06 Thread Wang LongTian
Dear folks,

Background: I'm leaning Kafka stream and want to use that in my product for 
real time streaming process with data from various sensors.

Question: 
1. Can I define my own processing function/api in Kafka stream except the 
predefined functions like groupby(), count() etc.?
2. If I could define my own function, could you please specify a example or 
introduce me a github project for example?

Thank u in advance!

Regards,
Long Tian

MirrorMaker and producers

2017-03-06 Thread Jack Foy
Hey, all. Is there any general guidance around using mirrored topics
in the context of a cluster migration?

We're moving operations from one data center to another, and we want
to stream mirrored data from the old cluster to the new, migrate
consumers, then migrate producers.

Our basic question is whether it's safe for us to commingle mirrored
and directly-produced data in the same topic, even serially. In other
words, is the following procedure safe? Why or why not?

- Data is produced to topic T on cluster A
- Topic T is mirrored to cluster B
- Consumers run against T on cluster B
- Producers gradually migrate from A to B

We've found the following, which seems to suggest no, but doesn't
address the point directly:
http://events.linuxfoundation.org/sites/events/files/slides/Kafka%20At%20Scale.pdf

-- 
Jack Foy 


Re: Kafka Kerberos Ansible

2017-03-06 Thread Mudit Agarwal
thanks Le.However my cluster is kerberized.

  From: Le Cyberian 
 To: Mudit Agarwal  
 Sent: Monday, 6 March 2017 9:24 PM
 Subject: Re: Kafka Kerberos Ansible
   
Hi Mudit,

I guess its more related to Ansible rather than Kafka itself, However i
will try to answer.

Since Ansible uses SSH and you already have passwordless ssh between
ansible host (which executes playbooks) to Kafka Cluster.

You can simply use ansible command or shell module to get the list of
topics available in the respective group.

For example: bin/kafka-consumer-groups.sh --new-consumer --describe --group
default --bootstrap-server localhost:9092

You can use above to get list of topics available along with some lag which
it might be behind if processing the pipeline.

I am not sure how listing topics would help you in your ansible role/task,
maybe you are using assert or something else to check something.

BR,

Le

On Mon, Mar 6, 2017 at 9:57 PM, Mudit Agarwal  wrote:

> Let me reframe the questions.
>
> How can i list the topics using ansible script from ansible host which is
> outside the kafka cluster.
> My kafka cluster is kerberized.
> Kafka and ansible are passwordless ssh.
>
> Thanks,
> Mudit
>
>
> --
> *From:* Le Cyberian 
> *To:* users@kafka.apache.org; Mudit Agarwal 
> *Sent:* Monday, 6 March 2017 6:46 PM
> *Subject:* Re: Kafka Kerberos Ansible
>
> Hi Mudit,
>
> What do you mean by accessing Kafka cluster outside Ansible VM ? It needs
> to listen to a interface which is available for the network outside of the
> VM
>
> BR,
>
> Lee
>
> On Mon, Mar 6, 2017 at 7:42 PM, Mudit Agarwal 
> wrote:
>
> Hi,
> How we can access the kafka cluster from an outside Ansible VM.The kafka
> is kerberiszed.All linux environment.
> Thanks,Mudit
>
>
>
>
>
>
>
>
>


   

Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Matthias J. Sax
Thanks for your input. I now understood the first issue (and the fix).
Still not sure about the second issue.

From my understanding, the deadlock is "caused" by your fix of problem
one. If the thread would die, the lock would get release and no deadlock
would occur.

However, because the thread does not die, it also does not release its
locks. Thus, from my understanding, in case of a CommitFailedException,
the thread must release its locks, too. This would resolve the deadlock
-- we don't need any timeout for this.

Or do I still miss understand the issue?


-Matthias


On 3/6/17 11:41 AM, Guozhang Wang wrote:
> Hello Sachin,
> 
> Thanks for your finds!! Just to add what Damian said regarding 1), in
> KIP-129 where we are introducing exactly-once processing semantics to
> Streams we have also described different categories of error handling for
> exactly-once. Commit exceptions due to rebalance will be handled as
> "producer fenced" which will not be thrown to the users as we already did
> in trunk.
> 
> As for the second issue you brought up, I agree it is indeed a bug; but
> just to clarify it is the CREATION of the first task including restoring
> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> processing it right (we are only start processing until all tasks have been
> created and initialized)?
> 
> 
> 
> Guozhang
> 
> 
> On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal  wrote:
> 
>> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>>
>>
>> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy  wrote:
>>
>>> Hi Sachin,
>>>
>>> If it is a bug then please file a JIRA for it, too.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal  wrote:
>>>
 Ok that's great.
 So you have already fixed that issue.

 I have modified my PR to remove that change (which was done keeping
 0.10.2.0 in mind).

 However the other issue is still valid.

 Please review that change. https://github.com/apache/kafka/pull/2642


 Thanks
 Sachin


 On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy 
>> wrote:

> On trunk the CommitFailedException isn't thrown anymore. The
 commitOffsets
> method doesn't throw an exception. It returns one if it was thrown.
>> We
 used
> to throw this exception during suspendTasksAndState, but we don't
 anymore.
>
> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal 
>> wrote:
>
>> Hi
>> On CommitFailedException at onPartitionsRevoked if it is thrown it
>>> gets
>> assigned to rebalanceException.
>> This causes the stream thread to shutdown. I am not sure how we can
> resume
>> the thread.
>>
>> Note thread is not in invalid state because because it already has
>>> been
>> assigned new partitions and this exception happens when trying to
 revoke
>> old partitions which have been moved to some other thread, so we
>> need
 to
>> swallow this exception at the StreanThread side too, just like we
 swallow
>> it at ConsumerCoordinator.java
>>
>> Also I fixed this against code base 0.10.2.0 and the difference in
>>> that
> vs
>> trunk code is these lines
>> 10.2.0
>>if (firstException.get() == null) {
>> firstException.set(commitOffsets());
>>}
>>  vs trunk
>> if (firstException.get() == null) {
>> // TODO: currently commit failures will not be thrown
>> to
> users
>> // while suspending tasks; this need to be re-visit
>> after
>> KIP-98
>> commitOffsets();
>> }
>> I am again not sure since this part is still a TODO, but looking at
 code
> I
>> see that commitOffsets can still throw the CommitFailedException
>>> which
>> needs to be handled at onPartitionsRevoked.
>>
>> Hope this makes sense.
>>
>> On second issue, the deadlock is not caused by
>> CommitFailedExceptio,
 but
>> after fixing the deadlock we need to make sure thread does not die
>>> due
 to
>> unhandled CommitFailedException at onPartitionsRevoked.
>> The deadlock issue is like this.
>> If a thread has two partitions and while processing partition one
>> it
> takes
>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
>>> evicted
>> from the group and both partitions are now migrated to some other
 thread.
>> Now when it tries to process the partition two it tries to get the
>>> lock
> to
>> rocks db. It won't get the lock since that partition is now moved
>> to
 some
>> other thread. So it keeps increasing the backoffTimeMs and keeps
>>> trying
> to
>> get the lock forever. This reaching a deadlock.
>> To fix this we need some upper bound of the time limit till it
>> tries
>>> to

Re: Kafka Kerberos Ansible

2017-03-06 Thread Le Cyberian
Hi Mudit,

I guess its more related to Ansible rather than Kafka itself, However i
will try to answer.

Since Ansible uses SSH and you already have passwordless ssh between
ansible host (which executes playbooks) to Kafka Cluster.

You can simply use ansible command or shell module to get the list of
topics available in the respective group.

For example: bin/kafka-consumer-groups.sh --new-consumer --describe --group
default --bootstrap-server localhost:9092

You can use above to get list of topics available along with some lag which
it might be behind if processing the pipeline.

I am not sure how listing topics would help you in your ansible role/task,
maybe you are using assert or something else to check something.

BR,

Le

On Mon, Mar 6, 2017 at 9:57 PM, Mudit Agarwal  wrote:

> Let me reframe the questions.
>
> How can i list the topics using ansible script from ansible host which is
> outside the kafka cluster.
> My kafka cluster is kerberized.
> Kafka and ansible are passwordless ssh.
>
> Thanks,
> Mudit
>
>
> --
> *From:* Le Cyberian 
> *To:* users@kafka.apache.org; Mudit Agarwal 
> *Sent:* Monday, 6 March 2017 6:46 PM
> *Subject:* Re: Kafka Kerberos Ansible
>
> Hi Mudit,
>
> What do you mean by accessing Kafka cluster outside Ansible VM ? It needs
> to listen to a interface which is available for the network outside of the
> VM
>
> BR,
>
> Lee
>
> On Mon, Mar 6, 2017 at 7:42 PM, Mudit Agarwal 
> wrote:
>
> Hi,
> How we can access the kafka cluster from an outside Ansible VM.The kafka
> is kerberiszed.All linux environment.
> Thanks,Mudit
>
>
>
>
>
>
>
>
>


3 node Kafka cluster with 3 node ZK ensemble

2017-03-06 Thread Mich Talebzadeh
Hi,

Assuming that we are building ZK and Kafka as a messaging system.

Two scenarios we consider.


   1. Deploy 3 physical hosts (as opposed to VM) and create a Kafka cluster
   there. On the same physical hosts create three ZK cluster.
   2.  Deploy 3 physical hosts (as opposed to VM) and create a Kafka
   cluster there. Deploy  additional three physical hosts and put ZK on them.


Option 2 will be more expensive obviously and I was wondering what
additional benefits it offers besides adding to the cost?


Also there is another dimension to this. Kafka read and writes are normally
serial and on a new version of Kafka say 0.9 or above the offsets are not
stored in ZK anymore. In contrast ZK uses random access which tends towards
faster disk?


Thanks


Dr Mich Talebzadeh




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Kafka Kerberos Ansible

2017-03-06 Thread Mudit Agarwal
Let me reframe the questions.
How can i list the topics using ansible script from ansible host which is 
outside the kafka cluster.My kafka cluster is kerberized.Kafka and ansible are 
passwordless ssh.
Thanks,Mudit

  From: Le Cyberian 
 To: users@kafka.apache.org; Mudit Agarwal  
 Sent: Monday, 6 March 2017 6:46 PM
 Subject: Re: Kafka Kerberos Ansible
   
Hi Mudit,

What do you mean by accessing Kafka cluster outside Ansible VM ? It needs to 
listen to a interface which is available for the network outside of the VM
BR,

Lee
On Mon, Mar 6, 2017 at 7:42 PM, Mudit Agarwal  
wrote:

Hi,
How we can access the kafka cluster from an outside Ansible VM.The kafka is 
kerberiszed.All linux environment.
Thanks,Mudit



   



   

Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Hi Han,

Thank you for your response. I understand. Its not possible to have a third
rack/server room at the moment as the requirement is to have redundancy
between both. I tried already to get one :-/

Is it possible to have a Zookeeper Ensemble (3 node) in one server room and
same in the other and have some sort of master-master replication in
between both of them ? would this make sense if its possible ? since in
this case both would have same config and split brain theoretically should
not happen.

I haven't does this Zookeeper 3rd node hack before :) i guess i need to
play around with it for a while to get it proper documented and functional
/ tested :)

Thanks again!

Le

On Mon, Mar 6, 2017 at 8:22 PM, Hans Jespersen  wrote:

>
> Is there any way you can find a third rack/server room/power supply nearby
> just for the 1 extra zookeeper node? You don’t have to put any kafka
> brokers there, just a single zookeeper.  It’s less likely to have a 3-way
> split brain because of a network partition. It’s so much cleaner with 3
> availability zones because everything would be automatic failover. This is
> how most people run when deployed in Amazon.
>
> Baring that I would say the next best thing would be 3 zookeepers in one
> zone and 2 zookeepers in the other zone so it will auto-failover if the 2
> zk zone fails. If the 3 zk zone fails you can setup a well tested set of
> manual steps to carefully configure a 3rd zookeeper clone (which matches
> the id of one of the failed nodes) and still get your system back up and
> running. If this is not something you have done before I suggest getting a
> few days of expert consulting to have someone help you set it up, test it,
> and document the proper failover and recovery procedures.
>
> -hans
>
>
>
>
> > On Mar 6, 2017, at 10:44 AM, Le Cyberian  wrote:
> >
> > Thanks Han and Alexander for taking time out and your responses.
> >
> > I now understand the risks and the possible outcome of having the desired
> > setup.
> >
> > What would be better in your opinion to have failover (active-active)
> > between both of these server rooms to avoid switching to the clone / 3rd
> > zookeeper.
> >
> > I mean even if there are 5 nodes having 3 in one server room and 2 in
> other
> > still there would be problem related to zookeeper majority leader
> election
> > if the server room goes down that has 3 nodes.
> >
> > is there some way to achieve this ?
> >
> > Thanks again!
> >
> > Lee
> >
> > On Mon, Mar 6, 2017 at 4:16 PM, Alexander Binzberger <
> > alexander.binzber...@wingcon.com> wrote:
> >
> >> I agree on this is one cluster but having one additional ZK node per
> site
> >> does not help. (as far as I understand ZK)
> >>
> >> A 3 out of 6 is also not a majority. So I think you mean 3/5 with a
> cloned
> >> 3rd one. This would mean manually switching the cloned one for majority
> >> which can cause issues again.
> >> 1. You actually build a master/slave ZK with manually switch over.
> >> 2. While switching the clone from room to room you would have downtime.
> >> 3. If you switch on both ZK node clones at the same time (by mistake)
> you
> >> screwed.
> >> 4. If you "switch" clones instead of moving it will all data on disk you
> >> generate a split brain from which you have to recover first.
> >>
> >> So if you loose the connection between the rooms / the rooms get
> separated
> >> / you loose one room:
> >> * You (might) need manual interaction
> >> * loose automatic fail-over between the rooms
> >> * might face complete outage if your "master" room with the active 3rd
> >> node is hit.
> >> Actually this is the same scenario with 2/3 nodes spread over two
> >> locations.
> >>
> >> What you need is a third cross connected location for real fault
> tolerance
> >> and distribute your 3 or 5 ZK nodes over those.
> >> Or live with a possible outage in such a scenario.
> >>
> >> Additional Hints:
> >> * You can run any number of Kafka brokers on a ZK cluster. In your case
> >> this could be 4 Kafka brokers on 3 ZK nodes.
> >> * You should set topic replication to 2 (can be done at any time) and
> some
> >> other producer/broker settings to ensure your messages will not get
> lost in
> >> switch over cases.
> >> * ZK service does not react nicely on disk full.
> >>
> >>
> >>
> >> Am 06.03.2017 um 15:10 schrieb Hans Jespersen:
> >>
> >>> In that case it’s really one cluster. Make sure to set different rack
> ids
> >>> for each server room so kafka will ensure that the replicas always span
> >>> both floors and you don’t loose availability of data if a server room
> goes
> >>> down.
> >>> You will have to configure one addition zookeeper node in each site
> which
> >>> you will only ever startup if a site goes down because otherwise 2 of 4
> >>> zookeeper nodes is not a quorum.Again you would be better with 3 nodes
> >>> because then you would only have to do this in the site that has the
> single
> >>> active node.
> >>>
> >>> -hans

Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Guozhang Wang
Hello Sachin,

Thanks for your finds!! Just to add what Damian said regarding 1), in
KIP-129 where we are introducing exactly-once processing semantics to
Streams we have also described different categories of error handling for
exactly-once. Commit exceptions due to rebalance will be handled as
"producer fenced" which will not be thrown to the users as we already did
in trunk.

As for the second issue you brought up, I agree it is indeed a bug; but
just to clarify it is the CREATION of the first task including restoring
stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
processing it right (we are only start processing until all tasks have been
created and initialized)?



Guozhang


On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal  wrote:

> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>
>
> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > If it is a bug then please file a JIRA for it, too.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 6 Mar 2017 at 11:23 Sachin Mittal  wrote:
> >
> > > Ok that's great.
> > > So you have already fixed that issue.
> > >
> > > I have modified my PR to remove that change (which was done keeping
> > > 0.10.2.0 in mind).
> > >
> > > However the other issue is still valid.
> > >
> > > Please review that change. https://github.com/apache/kafka/pull/2642
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy 
> wrote:
> > >
> > > > On trunk the CommitFailedException isn't thrown anymore. The
> > > commitOffsets
> > > > method doesn't throw an exception. It returns one if it was thrown.
> We
> > > used
> > > > to throw this exception during suspendTasksAndState, but we don't
> > > anymore.
> > > >
> > > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal 
> wrote:
> > > >
> > > > > Hi
> > > > > On CommitFailedException at onPartitionsRevoked if it is thrown it
> > gets
> > > > > assigned to rebalanceException.
> > > > > This causes the stream thread to shutdown. I am not sure how we can
> > > > resume
> > > > > the thread.
> > > > >
> > > > > Note thread is not in invalid state because because it already has
> > been
> > > > > assigned new partitions and this exception happens when trying to
> > > revoke
> > > > > old partitions which have been moved to some other thread, so we
> need
> > > to
> > > > > swallow this exception at the StreanThread side too, just like we
> > > swallow
> > > > > it at ConsumerCoordinator.java
> > > > >
> > > > > Also I fixed this against code base 0.10.2.0 and the difference in
> > that
> > > > vs
> > > > > trunk code is these lines
> > > > > 10.2.0
> > > > >if (firstException.get() == null) {
> > > > > firstException.set(commitOffsets());
> > > > >}
> > > > >  vs trunk
> > > > > if (firstException.get() == null) {
> > > > > // TODO: currently commit failures will not be thrown
> to
> > > > users
> > > > > // while suspending tasks; this need to be re-visit
> after
> > > > > KIP-98
> > > > > commitOffsets();
> > > > > }
> > > > > I am again not sure since this part is still a TODO, but looking at
> > > code
> > > > I
> > > > > see that commitOffsets can still throw the CommitFailedException
> > which
> > > > > needs to be handled at onPartitionsRevoked.
> > > > >
> > > > > Hope this makes sense.
> > > > >
> > > > > On second issue, the deadlock is not caused by
> CommitFailedExceptio,
> > > but
> > > > > after fixing the deadlock we need to make sure thread does not die
> > due
> > > to
> > > > > unhandled CommitFailedException at onPartitionsRevoked.
> > > > > The deadlock issue is like this.
> > > > > If a thread has two partitions and while processing partition one
> it
> > > > takes
> > > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> > evicted
> > > > > from the group and both partitions are now migrated to some other
> > > thread.
> > > > > Now when it tries to process the partition two it tries to get the
> > lock
> > > > to
> > > > > rocks db. It won't get the lock since that partition is now moved
> to
> > > some
> > > > > other thread. So it keeps increasing the backoffTimeMs and keeps
> > trying
> > > > to
> > > > > get the lock forever. This reaching a deadlock.
> > > > > To fix this we need some upper bound of the time limit till it
> tries
> > to
> > > > get
> > > > > that lock. And that upper bound has to be
> > MAX_POLL_INTERVAL_MS_CONFIG,
> > > > > because if by that time it has not got the lock, we can see that
> this
> > > > > thread was evicted from the group and need to rejoin again to get
> new
> > > > > partitions.
> > > > >
> > > > > On JIRA issue I can create one and attach the part of logs where it
> > > keeps
> > > > > trying to get the lock with increasing backoffTimeM.
> > > > >
> > > > > Let me know if these makes sense. 

Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Hans Jespersen

Is there any way you can find a third rack/server room/power supply nearby just 
for the 1 extra zookeeper node? You don’t have to put any kafka brokers there, 
just a single zookeeper.  It’s less likely to have a 3-way split brain because 
of a network partition. It’s so much cleaner with 3 availability zones because 
everything would be automatic failover. This is how most people run when 
deployed in Amazon. 

Baring that I would say the next best thing would be 3 zookeepers in one zone 
and 2 zookeepers in the other zone so it will auto-failover if the 2 zk zone 
fails. If the 3 zk zone fails you can setup a well tested set of manual steps 
to carefully configure a 3rd zookeeper clone (which matches the id of one of 
the failed nodes) and still get your system back up and running. If this is not 
something you have done before I suggest getting a few days of expert 
consulting to have someone help you set it up, test it, and document the proper 
failover and recovery procedures.

-hans




> On Mar 6, 2017, at 10:44 AM, Le Cyberian  wrote:
> 
> Thanks Han and Alexander for taking time out and your responses.
> 
> I now understand the risks and the possible outcome of having the desired
> setup.
> 
> What would be better in your opinion to have failover (active-active)
> between both of these server rooms to avoid switching to the clone / 3rd
> zookeeper.
> 
> I mean even if there are 5 nodes having 3 in one server room and 2 in other
> still there would be problem related to zookeeper majority leader election
> if the server room goes down that has 3 nodes.
> 
> is there some way to achieve this ?
> 
> Thanks again!
> 
> Lee
> 
> On Mon, Mar 6, 2017 at 4:16 PM, Alexander Binzberger <
> alexander.binzber...@wingcon.com> wrote:
> 
>> I agree on this is one cluster but having one additional ZK node per site
>> does not help. (as far as I understand ZK)
>> 
>> A 3 out of 6 is also not a majority. So I think you mean 3/5 with a cloned
>> 3rd one. This would mean manually switching the cloned one for majority
>> which can cause issues again.
>> 1. You actually build a master/slave ZK with manually switch over.
>> 2. While switching the clone from room to room you would have downtime.
>> 3. If you switch on both ZK node clones at the same time (by mistake) you
>> screwed.
>> 4. If you "switch" clones instead of moving it will all data on disk you
>> generate a split brain from which you have to recover first.
>> 
>> So if you loose the connection between the rooms / the rooms get separated
>> / you loose one room:
>> * You (might) need manual interaction
>> * loose automatic fail-over between the rooms
>> * might face complete outage if your "master" room with the active 3rd
>> node is hit.
>> Actually this is the same scenario with 2/3 nodes spread over two
>> locations.
>> 
>> What you need is a third cross connected location for real fault tolerance
>> and distribute your 3 or 5 ZK nodes over those.
>> Or live with a possible outage in such a scenario.
>> 
>> Additional Hints:
>> * You can run any number of Kafka brokers on a ZK cluster. In your case
>> this could be 4 Kafka brokers on 3 ZK nodes.
>> * You should set topic replication to 2 (can be done at any time) and some
>> other producer/broker settings to ensure your messages will not get lost in
>> switch over cases.
>> * ZK service does not react nicely on disk full.
>> 
>> 
>> 
>> Am 06.03.2017 um 15:10 schrieb Hans Jespersen:
>> 
>>> In that case it’s really one cluster. Make sure to set different rack ids
>>> for each server room so kafka will ensure that the replicas always span
>>> both floors and you don’t loose availability of data if a server room goes
>>> down.
>>> You will have to configure one addition zookeeper node in each site which
>>> you will only ever startup if a site goes down because otherwise 2 of 4
>>> zookeeper nodes is not a quorum.Again you would be better with 3 nodes
>>> because then you would only have to do this in the site that has the single
>>> active node.
>>> 
>>> -hans
>>> 
>>> 
>>> On Mar 6, 2017, at 5:57 AM, Le Cyberian  wrote:
 
 Hi Hans,
 
 Thank you for your reply.
 
 Its basically two different server rooms on different floors and they are
 connected with fiber connectivity so its almost like a local connection
 between them no network latencies / lag.
 
 If i do a Mirror Maker / Replicator then i will not be able to use them
 at
 the same time for writes./ producers. because the consumers / producers
 will request from all of them
 
 BR,
 
 Lee
 
 On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen 
 wrote:
 
 What do you mean when you say you have "2 sites not datacenters"? You
> should be very careful configuring a stretch cluster across multiple
> sites.
> What is the RTT between the two sites? Why do you think that MIrror
> Maker
> (or 

Re: Kafka Kerberos Ansible

2017-03-06 Thread Le Cyberian
Hi Mudit,

What do you mean by accessing Kafka cluster outside Ansible VM ? It needs
to listen to a interface which is available for the network outside of the
VM

BR,

Lee

On Mon, Mar 6, 2017 at 7:42 PM, Mudit Agarwal 
wrote:

> Hi,
> How we can access the kafka cluster from an outside Ansible VM.The kafka
> is kerberiszed.All linux environment.
> Thanks,Mudit
>
>
>
>


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Thanks Han and Alexander for taking time out and your responses.

I now understand the risks and the possible outcome of having the desired
setup.

What would be better in your opinion to have failover (active-active)
between both of these server rooms to avoid switching to the clone / 3rd
zookeeper.

I mean even if there are 5 nodes having 3 in one server room and 2 in other
still there would be problem related to zookeeper majority leader election
if the server room goes down that has 3 nodes.

is there some way to achieve this ?

Thanks again!

Lee

On Mon, Mar 6, 2017 at 4:16 PM, Alexander Binzberger <
alexander.binzber...@wingcon.com> wrote:

> I agree on this is one cluster but having one additional ZK node per site
> does not help. (as far as I understand ZK)
>
> A 3 out of 6 is also not a majority. So I think you mean 3/5 with a cloned
> 3rd one. This would mean manually switching the cloned one for majority
> which can cause issues again.
> 1. You actually build a master/slave ZK with manually switch over.
> 2. While switching the clone from room to room you would have downtime.
> 3. If you switch on both ZK node clones at the same time (by mistake) you
> screwed.
> 4. If you "switch" clones instead of moving it will all data on disk you
> generate a split brain from which you have to recover first.
>
> So if you loose the connection between the rooms / the rooms get separated
> / you loose one room:
> * You (might) need manual interaction
> * loose automatic fail-over between the rooms
> * might face complete outage if your "master" room with the active 3rd
> node is hit.
> Actually this is the same scenario with 2/3 nodes spread over two
> locations.
>
> What you need is a third cross connected location for real fault tolerance
> and distribute your 3 or 5 ZK nodes over those.
> Or live with a possible outage in such a scenario.
>
> Additional Hints:
> * You can run any number of Kafka brokers on a ZK cluster. In your case
> this could be 4 Kafka brokers on 3 ZK nodes.
> * You should set topic replication to 2 (can be done at any time) and some
> other producer/broker settings to ensure your messages will not get lost in
> switch over cases.
> * ZK service does not react nicely on disk full.
>
>
>
> Am 06.03.2017 um 15:10 schrieb Hans Jespersen:
>
>> In that case it’s really one cluster. Make sure to set different rack ids
>> for each server room so kafka will ensure that the replicas always span
>> both floors and you don’t loose availability of data if a server room goes
>> down.
>> You will have to configure one addition zookeeper node in each site which
>> you will only ever startup if a site goes down because otherwise 2 of 4
>> zookeeper nodes is not a quorum.Again you would be better with 3 nodes
>> because then you would only have to do this in the site that has the single
>> active node.
>>
>> -hans
>>
>>
>> On Mar 6, 2017, at 5:57 AM, Le Cyberian  wrote:
>>>
>>> Hi Hans,
>>>
>>> Thank you for your reply.
>>>
>>> Its basically two different server rooms on different floors and they are
>>> connected with fiber connectivity so its almost like a local connection
>>> between them no network latencies / lag.
>>>
>>> If i do a Mirror Maker / Replicator then i will not be able to use them
>>> at
>>> the same time for writes./ producers. because the consumers / producers
>>> will request from all of them
>>>
>>> BR,
>>>
>>> Lee
>>>
>>> On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen 
>>> wrote:
>>>
>>> What do you mean when you say you have "2 sites not datacenters"? You
 should be very careful configuring a stretch cluster across multiple
 sites.
 What is the RTT between the two sites? Why do you think that MIrror
 Maker
 (or Confluent Replicator) would not work between the sites and yet you
 think a stretch cluster will work? That seems wrong.

 -hans

 /**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

 On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian 
 wrote:

 Hi Guys,
>
> Thank you very much for you reply.
>
> The scenario which i have to implement is that i have 2 sites not
> datacenters so mirror maker would not work here.
>
> There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The
>
 idea

> is to have Active-Active setup along with fault tolerance so that if
> one
>
 of

> the site goes on the operations are normal.
>
> In this case if i go ahead with 4 node-cluster of both zookeeper and
>
 kafka

> it will give failover tolerance for 1 node only.
>
> What do you suggest to do in this case ? because to divide between 2
>
 sites

> it needs to be even number if that makes sense ? Also if possible some
>
 help

> regarding partitions for topic and replication 

Kafka Kerberos Ansible

2017-03-06 Thread Mudit Agarwal
Hi,
How we can access the kafka cluster from an outside Ansible VM.The kafka is 
kerberiszed.All linux environment.
Thanks,Mudit

 
 
   

Re: Kafka streams questions

2017-03-06 Thread Matthias J. Sax
Yes, that is the parameter I was referring, too.

And yes, you can set consumer/producer config via StreamsConfig.
However, it's recommended to use

> props.put(StreamsConfig.consumerPrefix("consumer.parameter.name"), value);


-Matthias



On 3/6/17 6:48 AM, Neil Moore wrote:
> Thanks for the answers, Matthias.
> 
> 
> You mention a metadata refresh interval. I see Kafka producers and consumers 
> have a property called metadata.max.age.ms which sounds similar. From the 
> documentation and looking at the Javadoc for Kafka streams it is not clear to 
> me how I can affect KafkaStreams' discovery of topics and partitions. It is 
> by configuring consumers using the Properties/StreamsConfig object passed to 
> KafkaStreams' constructor? I.e. something like
> 
> 
> props.put(StreamsConfig.CONSUMER_PREFIX + "metadata.max.age.ms", 1)
> 
> ..
> 
> KafkaStreams streams = new KafkaStreams(blah, props)
> 
> 
> Thanks,
> 
> 
> Neil
> 
> 
> From: Matthias J. Sax 
> Sent: 28 February 2017 22:26:39
> To: users@kafka.apache.org
> Subject: Re: Kafka streams questions
> 
> Adding partitions:
> 
> You should not add partitions at runtime -- it might break the semantics
> of your application because is might "mess up" you hash partitioning.
> Cf.
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowtoscaleaStreamsapp,i.e.,increasenumberofinputpartitions?
> 
> 
> If you are sure, that this is not a concern, because you don't do any
> stateful operation (or some _manual_ re-partitioning within your
> application before any key-based operation), than Streams should pick up
> added partitions automatically. This can take multiple minutes depending
> on your metadata refresh interval (default is 5 minutes).
> 
> 
> About rewinding consumer partition offsets:
> 
> There is no tool to do this right now. But there is a KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
> 
> For now, you could write you own little Java program that manipulate the
> offsets before you start your Streams application. However, be aware the
> this will result in duplicate processing as there is currently no way to
> reset your state stores.
> 
> 
> -Matthias
> 
> 
> 
> 
> On 2/28/17 1:31 PM, Neil Moore wrote:
>> Hello,
>>
>>
>> I have a few questions that I couldn't find answers to in the documentation:
>>
>>
>>   *   Can added partitions be auto-discovered by kafka-streams? In my 
>> informal tests I have had to restart the stream nodes.
>>   *   Is it possible to rewind the consumer for a particular 
>> topic-partitions. e.g. if I have a Processor handling a topic-partition can 
>> I rewind that a to an arbitrary offset? I saw that 
>> kafka-streams-application-reset allows all partitions to be reset, but I'd 
>> like to decide per partition.
>>
>> If anybody can shed any light on these issues I'd be most grateful.
>>
>> Thanks.
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-03-06 Thread Ismael Juma
Hi Mickael,

This looks to be the same as KAFKA-4669. In theory, this should never
happen and it's unclear when/how it can happen. Not sure if someone has
investigated it in more detail.

Ismael

On Mon, Mar 6, 2017 at 5:15 PM, Mickael Maison 
wrote:

> Hi,
>
> In one of our clusters, some of our clients occasionally see this
> exception:
> java.lang.IllegalStateException: Correlation id for response (4564)
> does not match request (4562)
> at org.apache.kafka.clients.NetworkClient.correlate(
> NetworkClient.java:486)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:381)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
> at java.lang.Thread.run(Unknown Source)
>
> We've also seen it from consumer poll() and commit()
>
> Usually the response's correlation id is off by just 1 or 2 (like
> above) but we've also seen it off by a few hundreds:
> java.lang.IllegalStateException: Correlation id for response (742)
> does not match request (174)
> at org.apache.kafka.clients.NetworkClient.correlate(
> NetworkClient.java:486)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:381)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:426)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1059)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1027)
>
> When this happens, all subsequent responses are also shifted:
> java.lang.IllegalStateException: Correlation id for response (743)
> does not match request (742)
> java.lang.IllegalStateException: Correlation id for response (744)
> does not match request (743)
> java.lang.IllegalStateException: Correlation id for response (745)
> does not match request (744)
> java.lang.IllegalStateException: Correlation id for response (746)
> does not match request (745)
>  ...
> It's easy to discard and recreate the consumer instance to recover
> however we can't do that with the producer as it occurs in the Sender
> thread.
>
> Our cluster and our clients are running Kafka 0.10.0.1.
> Under which circumstances would such an error happen ?
> Even with logging set to TRACE, we can't spot anything suspicious
> shortly before the issue. Is there any data we should try to capture
> when this happens ?
>
> Thanks!
>


Re: Performance and encryption

2017-03-06 Thread IT Consultant
Hi Todd

Can you please help me with notes or document on how did you achieve
encryption ?

I have followed data available on official sites but failed as I m no good
with TLS .

On Mar 6, 2017 19:55, "Todd Palino"  wrote:

> It’s not that Kafka has to decode it, it’s that it has to send it across
> the network. This is specific to enabling TLS support (transport
> encryption), and won’t affect any end-to-end encryption you do at the
> client level.
>
> The operation in question is called “zero copy”. In order to send a message
> batch to a consumer, the Kafka broker must read it from disk (sometimes
> it’s cached in memory, but that’s irrelevant here) and send it across the
> network. The Linux kernel allows this to happen without having to copy the
> data in memory (to move it from the disk buffers to the network buffers).
> However, if TLS is enabled, the broker must first encrypt the data going
> across the network. This means that it can no longer take advantage of the
> zero copy optimization as it has to make a copy in the process of applying
> the TLS encryption.
>
> Now, how much of an impact this has on the broker operations is up for
> debate, I think. Originally, when we ran into this problem was when TLS
> support was added to Kafka and the zero copy send for plaintext
> communications was accidentally removed as well. At the time, we saw a
> significant performance hit, and the code was patched to put it back.
> However, since then I’ve turned on inter-broker TLS in all of our clusters,
> and when we did that there was no performance hit. This is odd, because the
> replica fetchers should take advantage of the same zero copy optimization.
>
> It’s possible that it’s because it’s just one consumer (the replica
> fetchers). We’re about to start testing additional consumers over TLS, so
> we’ll see what happens at that point. All I can suggest right now is that
> you test in your environment and see what the impact is. Oh, and using
> message keys (or not) won’t matter here.
>
> -Todd
>
>
> On Mon, Mar 6, 2017 at 5:38 AM, Nicolas Motte 
> wrote:
>
> > Hi everyone,
> >
> > I understand one of the reasons why Kafka is performant is by using
> > zero-copy.
> >
> > I often hear that when encryption is enabled, then Kafka has to copy the
> > data in user space to decode the message, so it has a big impact on
> > performance.
> >
> > If it is true, I don t get why the message has to be decoded by Kafka. I
> > would assume that whether the message is encrypted or not, Kafka simply
> > receives it, appends it to the file, and when a consumer wants to read
> it,
> > it simply reads at the right offset...
> >
> > Also I m wondering if it s the case if we don t use keys (pure queuing
> > system with key=null).
> >
> > Cheers
> > Nico
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Performance and encryption

2017-03-06 Thread Todd Palino
Thanks for the link, Ismael. I had thought that the most recent kernels
already implemented this, but I was probably confusing it with BSD. Most of
my systems are stuck in the stone age right now anyway.

It would be nice to get KAFKA-2561 in, either way. First off, if you can
take advantage of it it’s a good performance boost. Second, especially with
the security landscape getting worse and worse, it would be good to have
options as far as the TLS implementation goes. A zero-day exploit in the
Java TLS implementation would be devastating, and more difficult to react
to as it would require a new JRE (bringing with it who knows what
problems). Swapping an underlying OpenSSL version would be much more
palatable.

-Todd


On Mon, Mar 6, 2017 at 9:01 AM, Ismael Juma  wrote:

> Even though OpenSSL is much faster than the Java 8 TLS implementation (I
> haven't tested against Java 9, which is much faster than Java 8, but
> probably still slower than OpenSSL), all the tests were without zero copy
> in the sense that is being discussed here (i.e. sendfile). To benefit from
> sendfile with TLS, kernel-level changes/modules are required:
>
> https://github.com/ktls/af_ktls
> http://www.phoronix.com/scan.php?page=news_item=FreeBSD-Faster-Sendfile
>
> Ismael
>
> On Mon, Mar 6, 2017 at 4:18 PM, Todd Palino  wrote:
>
> > So that’s not quite true, Hans. First, as far as the performance hit
> being
> > not a big impact (25% is huge). Or that it’s to be expected. Part of the
> > problem is that the Java TLS implementation does not support zero copy.
> > OpenSSL does, and in fact there’s been a ticket open to allow Kafka to
> > support using OpenSSL for a while now:
> >
> > https://issues.apache.org/jira/browse/KAFKA-2561
> >
> >
> >
> >
> > On Mon, Mar 6, 2017 at 6:30 AM, Hans Jespersen 
> wrote:
> >
> > >
> > > Its not a single message at a time that is encrypted with TLS its the
> > > entire network byte stream so a Kafka broker can’t even see the Kafka
> > > Protocol tunneled inside TLS unless it’s terminated at the broker.
> > > It is true that losing the zero copy optimization impacts performance
> > > somewhat  but it’s not what I would call a “big impact” because Kafka
> > does
> > > a lot of other things to get it’s performance (like using page cache
> and
> > > doing lots on sequential disk I/O). The difference should be something
> in
> > > the order of 25-30% slower with TLS enabled which is about what you
> would
> > > see with any other messaging protocol with TLS on vs off.
> > >
> > > If you wanted to encrypt each message independently before sending to
> > > Kafka then zero copy would still be in effect and all the consumers
> would
> > > get the same encrypted message (and have to understand how to decrypt
> > it).
> > >
> > > -hans
> > >
> > >
> > >
> > > > On Mar 6, 2017, at 5:38 AM, Nicolas Motte 
> > wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > I understand one of the reasons why Kafka is performant is by using
> > > > zero-copy.
> > > >
> > > > I often hear that when encryption is enabled, then Kafka has to copy
> > the
> > > > data in user space to decode the message, so it has a big impact on
> > > > performance.
> > > >
> > > > If it is true, I don t get why the message has to be decoded by
> Kafka.
> > I
> > > > would assume that whether the message is encrypted or not, Kafka
> simply
> > > > receives it, appends it to the file, and when a consumer wants to
> read
> > > it,
> > > > it simply reads at the right offset...
> > > >
> > > > Also I m wondering if it s the case if we don t use keys (pure
> queuing
> > > > system with key=null).
> > > >
> > > > Cheers
> > > > Nico
> > >
> > >
> >
> >
> > --
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-03-06 Thread Mickael Maison
Hi,

In one of our clusters, some of our clients occasionally see this exception:
java.lang.IllegalStateException: Correlation id for response (4564)
does not match request (4562)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Unknown Source)

We've also seen it from consumer poll() and commit()

Usually the response's correlation id is off by just 1 or 2 (like
above) but we've also seen it off by a few hundreds:
java.lang.IllegalStateException: Correlation id for response (742)
does not match request (174)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027)

When this happens, all subsequent responses are also shifted:
java.lang.IllegalStateException: Correlation id for response (743)
does not match request (742)
java.lang.IllegalStateException: Correlation id for response (744)
does not match request (743)
java.lang.IllegalStateException: Correlation id for response (745)
does not match request (744)
java.lang.IllegalStateException: Correlation id for response (746)
does not match request (745)
 ...
It's easy to discard and recreate the consumer instance to recover
however we can't do that with the producer as it occurs in the Sender
thread.

Our cluster and our clients are running Kafka 0.10.0.1.
Under which circumstances would such an error happen ?
Even with logging set to TRACE, we can't spot anything suspicious
shortly before the issue. Is there any data we should try to capture
when this happens ?

Thanks!


Re: Performance and encryption

2017-03-06 Thread Ismael Juma
Even though OpenSSL is much faster than the Java 8 TLS implementation (I
haven't tested against Java 9, which is much faster than Java 8, but
probably still slower than OpenSSL), all the tests were without zero copy
in the sense that is being discussed here (i.e. sendfile). To benefit from
sendfile with TLS, kernel-level changes/modules are required:

https://github.com/ktls/af_ktls
http://www.phoronix.com/scan.php?page=news_item=FreeBSD-Faster-Sendfile

Ismael

On Mon, Mar 6, 2017 at 4:18 PM, Todd Palino  wrote:

> So that’s not quite true, Hans. First, as far as the performance hit being
> not a big impact (25% is huge). Or that it’s to be expected. Part of the
> problem is that the Java TLS implementation does not support zero copy.
> OpenSSL does, and in fact there’s been a ticket open to allow Kafka to
> support using OpenSSL for a while now:
>
> https://issues.apache.org/jira/browse/KAFKA-2561
>
>
>
>
> On Mon, Mar 6, 2017 at 6:30 AM, Hans Jespersen  wrote:
>
> >
> > Its not a single message at a time that is encrypted with TLS its the
> > entire network byte stream so a Kafka broker can’t even see the Kafka
> > Protocol tunneled inside TLS unless it’s terminated at the broker.
> > It is true that losing the zero copy optimization impacts performance
> > somewhat  but it’s not what I would call a “big impact” because Kafka
> does
> > a lot of other things to get it’s performance (like using page cache and
> > doing lots on sequential disk I/O). The difference should be something in
> > the order of 25-30% slower with TLS enabled which is about what you would
> > see with any other messaging protocol with TLS on vs off.
> >
> > If you wanted to encrypt each message independently before sending to
> > Kafka then zero copy would still be in effect and all the consumers would
> > get the same encrypted message (and have to understand how to decrypt
> it).
> >
> > -hans
> >
> >
> >
> > > On Mar 6, 2017, at 5:38 AM, Nicolas Motte 
> wrote:
> > >
> > > Hi everyone,
> > >
> > > I understand one of the reasons why Kafka is performant is by using
> > > zero-copy.
> > >
> > > I often hear that when encryption is enabled, then Kafka has to copy
> the
> > > data in user space to decode the message, so it has a big impact on
> > > performance.
> > >
> > > If it is true, I don t get why the message has to be decoded by Kafka.
> I
> > > would assume that whether the message is encrypted or not, Kafka simply
> > > receives it, appends it to the file, and when a consumer wants to read
> > it,
> > > it simply reads at the right offset...
> > >
> > > Also I m wondering if it s the case if we don t use keys (pure queuing
> > > system with key=null).
> > >
> > > Cheers
> > > Nico
> >
> >
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Performance and encryption

2017-03-06 Thread Todd Palino
So that’s not quite true, Hans. First, as far as the performance hit being
not a big impact (25% is huge). Or that it’s to be expected. Part of the
problem is that the Java TLS implementation does not support zero copy.
OpenSSL does, and in fact there’s been a ticket open to allow Kafka to
support using OpenSSL for a while now:

https://issues.apache.org/jira/browse/KAFKA-2561




On Mon, Mar 6, 2017 at 6:30 AM, Hans Jespersen  wrote:

>
> Its not a single message at a time that is encrypted with TLS its the
> entire network byte stream so a Kafka broker can’t even see the Kafka
> Protocol tunneled inside TLS unless it’s terminated at the broker.
> It is true that losing the zero copy optimization impacts performance
> somewhat  but it’s not what I would call a “big impact” because Kafka does
> a lot of other things to get it’s performance (like using page cache and
> doing lots on sequential disk I/O). The difference should be something in
> the order of 25-30% slower with TLS enabled which is about what you would
> see with any other messaging protocol with TLS on vs off.
>
> If you wanted to encrypt each message independently before sending to
> Kafka then zero copy would still be in effect and all the consumers would
> get the same encrypted message (and have to understand how to decrypt it).
>
> -hans
>
>
>
> > On Mar 6, 2017, at 5:38 AM, Nicolas Motte  wrote:
> >
> > Hi everyone,
> >
> > I understand one of the reasons why Kafka is performant is by using
> > zero-copy.
> >
> > I often hear that when encryption is enabled, then Kafka has to copy the
> > data in user space to decode the message, so it has a big impact on
> > performance.
> >
> > If it is true, I don t get why the message has to be decoded by Kafka. I
> > would assume that whether the message is encrypted or not, Kafka simply
> > receives it, appends it to the file, and when a consumer wants to read
> it,
> > it simply reads at the right offset...
> >
> > Also I m wondering if it s the case if we don t use keys (pure queuing
> > system with key=null).
> >
> > Cheers
> > Nico
>
>


-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Sachin Mittal
Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848


On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy  wrote:

> Hi Sachin,
>
> If it is a bug then please file a JIRA for it, too.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal  wrote:
>
> > Ok that's great.
> > So you have already fixed that issue.
> >
> > I have modified my PR to remove that change (which was done keeping
> > 0.10.2.0 in mind).
> >
> > However the other issue is still valid.
> >
> > Please review that change. https://github.com/apache/kafka/pull/2642
> >
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy  wrote:
> >
> > > On trunk the CommitFailedException isn't thrown anymore. The
> > commitOffsets
> > > method doesn't throw an exception. It returns one if it was thrown. We
> > used
> > > to throw this exception during suspendTasksAndState, but we don't
> > anymore.
> > >
> > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal  wrote:
> > >
> > > > Hi
> > > > On CommitFailedException at onPartitionsRevoked if it is thrown it
> gets
> > > > assigned to rebalanceException.
> > > > This causes the stream thread to shutdown. I am not sure how we can
> > > resume
> > > > the thread.
> > > >
> > > > Note thread is not in invalid state because because it already has
> been
> > > > assigned new partitions and this exception happens when trying to
> > revoke
> > > > old partitions which have been moved to some other thread, so we need
> > to
> > > > swallow this exception at the StreanThread side too, just like we
> > swallow
> > > > it at ConsumerCoordinator.java
> > > >
> > > > Also I fixed this against code base 0.10.2.0 and the difference in
> that
> > > vs
> > > > trunk code is these lines
> > > > 10.2.0
> > > >if (firstException.get() == null) {
> > > > firstException.set(commitOffsets());
> > > >}
> > > >  vs trunk
> > > > if (firstException.get() == null) {
> > > > // TODO: currently commit failures will not be thrown to
> > > users
> > > > // while suspending tasks; this need to be re-visit after
> > > > KIP-98
> > > > commitOffsets();
> > > > }
> > > > I am again not sure since this part is still a TODO, but looking at
> > code
> > > I
> > > > see that commitOffsets can still throw the CommitFailedException
> which
> > > > needs to be handled at onPartitionsRevoked.
> > > >
> > > > Hope this makes sense.
> > > >
> > > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> > but
> > > > after fixing the deadlock we need to make sure thread does not die
> due
> > to
> > > > unhandled CommitFailedException at onPartitionsRevoked.
> > > > The deadlock issue is like this.
> > > > If a thread has two partitions and while processing partition one it
> > > takes
> > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> evicted
> > > > from the group and both partitions are now migrated to some other
> > thread.
> > > > Now when it tries to process the partition two it tries to get the
> lock
> > > to
> > > > rocks db. It won't get the lock since that partition is now moved to
> > some
> > > > other thread. So it keeps increasing the backoffTimeMs and keeps
> trying
> > > to
> > > > get the lock forever. This reaching a deadlock.
> > > > To fix this we need some upper bound of the time limit till it tries
> to
> > > get
> > > > that lock. And that upper bound has to be
> MAX_POLL_INTERVAL_MS_CONFIG,
> > > > because if by that time it has not got the lock, we can see that this
> > > > thread was evicted from the group and need to rejoin again to get new
> > > > partitions.
> > > >
> > > > On JIRA issue I can create one and attach the part of logs where it
> > keeps
> > > > trying to get the lock with increasing backoffTimeM.
> > > >
> > > > Let me know if these makes sense. Right now this is the best way we
> > could
> > > > come up with to handle stream thread failures.
> > > >
> > > > Also on a side note I feel we need more resilient streams. If we have
> > say
> > > > configured our streams application with 4 threads and for whatever
> > > reason a
> > > > thread dies, then application should itself (or via some exposed
> > hooks),
> > > > allow to restart a new thread (because in Java I guess same thread
> > cannot
> > > > be restarted), so that number of threads always stay what one has
> > > > configured.
> > > > I think exposed hooks will be better option to do this.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > Sachin,
> > > > >
> > > > > thanks a lot for contributing!
> > > > >
> > > > > Right now, I am not sure if I understand the change. On
> > > > > CommitFailedException, why can we just resume the thread? To me, it
> > > > > seems that the 

Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Alexander Binzberger
I agree on this is one cluster but having one additional ZK node per 
site does not help. (as far as I understand ZK)


A 3 out of 6 is also not a majority. So I think you mean 3/5 with a 
cloned 3rd one. This would mean manually switching the cloned one for 
majority which can cause issues again.

1. You actually build a master/slave ZK with manually switch over.
2. While switching the clone from room to room you would have downtime.
3. If you switch on both ZK node clones at the same time (by mistake) 
you screwed.
4. If you "switch" clones instead of moving it will all data on disk you 
generate a split brain from which you have to recover first.


So if you loose the connection between the rooms / the rooms get 
separated / you loose one room:

* You (might) need manual interaction
* loose automatic fail-over between the rooms
* might face complete outage if your "master" room with the active 3rd 
node is hit.

Actually this is the same scenario with 2/3 nodes spread over two locations.

What you need is a third cross connected location for real fault 
tolerance and distribute your 3 or 5 ZK nodes over those.

Or live with a possible outage in such a scenario.

Additional Hints:
* You can run any number of Kafka brokers on a ZK cluster. In your case 
this could be 4 Kafka brokers on 3 ZK nodes.
* You should set topic replication to 2 (can be done at any time) and 
some other producer/broker settings to ensure your messages will not get 
lost in switch over cases.

* ZK service does not react nicely on disk full.


Am 06.03.2017 um 15:10 schrieb Hans Jespersen:

In that case it’s really one cluster. Make sure to set different rack ids for 
each server room so kafka will ensure that the replicas always span both floors 
and you don’t loose availability of data if a server room goes down.
You will have to configure one addition zookeeper node in each site which you 
will only ever startup if a site goes down because otherwise 2 of 4 zookeeper 
nodes is not a quorum.Again you would be better with 3 nodes because then you 
would only have to do this in the site that has the single active node.

-hans



On Mar 6, 2017, at 5:57 AM, Le Cyberian  wrote:

Hi Hans,

Thank you for your reply.

Its basically two different server rooms on different floors and they are
connected with fiber connectivity so its almost like a local connection
between them no network latencies / lag.

If i do a Mirror Maker / Replicator then i will not be able to use them at
the same time for writes./ producers. because the consumers / producers
will request from all of them

BR,

Lee

On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen  wrote:


What do you mean when you say you have "2 sites not datacenters"? You
should be very careful configuring a stretch cluster across multiple sites.
What is the RTT between the two sites? Why do you think that MIrror Maker
(or Confluent Replicator) would not work between the sites and yet you
think a stretch cluster will work? That seems wrong.

-hans

/**
* Hans Jespersen, Principal Systems Engineer, Confluent Inc.
* h...@confluent.io (650)924-2670
*/

On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian  wrote:


Hi Guys,

Thank you very much for you reply.

The scenario which i have to implement is that i have 2 sites not
datacenters so mirror maker would not work here.

There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The

idea

is to have Active-Active setup along with fault tolerance so that if one

of

the site goes on the operations are normal.

In this case if i go ahead with 4 node-cluster of both zookeeper and

kafka

it will give failover tolerance for 1 node only.

What do you suggest to do in this case ? because to divide between 2

sites

it needs to be even number if that makes sense ? Also if possible some

help

regarding partitions for topic and replication factor.

I already have Kafka running with quiet few topics having replication
factor 1 along with 1 default partition, is there a way to repartition /
increase partition of existing topics when i migrate to above setup ? I
think we can increase replication factor by Kafka rebalance tool.

Thanks alot for your help and time looking into this.

BR,

Le

On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen 

wrote:

Jens,

I think you are correct that a 4 node zookeeper ensemble can be made to
work but it will be slightly less resilient than a 3 node ensemble

because

it can only tolerate 1 failure (same as a 3 node ensemble) and the
likelihood of node failures is higher because there is 1 more node that
could fail.
So it SHOULD be an odd number of zookeeper nodes (not MUST).

-hans



On Mar 6, 2017, at 12:20 AM, Jens Rantil 

wrote:

Hi Hans,


On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 

wrote:

A 4 node zookeeper ensemble will not even work. It MUST be an odd

number

of zookeeper nodes to start.



Re: Kafka streams questions

2017-03-06 Thread Neil Moore
Thanks for the answers, Matthias.


You mention a metadata refresh interval. I see Kafka producers and consumers 
have a property called metadata.max.age.ms which sounds similar. From the 
documentation and looking at the Javadoc for Kafka streams it is not clear to 
me how I can affect KafkaStreams' discovery of topics and partitions. It is by 
configuring consumers using the Properties/StreamsConfig object passed to 
KafkaStreams' constructor? I.e. something like


props.put(StreamsConfig.CONSUMER_PREFIX + "metadata.max.age.ms", 1)

..

KafkaStreams streams = new KafkaStreams(blah, props)


Thanks,


Neil


From: Matthias J. Sax 
Sent: 28 February 2017 22:26:39
To: users@kafka.apache.org
Subject: Re: Kafka streams questions

Adding partitions:

You should not add partitions at runtime -- it might break the semantics
of your application because is might "mess up" you hash partitioning.
Cf.
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowtoscaleaStreamsapp,i.e.,increasenumberofinputpartitions?


If you are sure, that this is not a concern, because you don't do any
stateful operation (or some _manual_ re-partitioning within your
application before any key-based operation), than Streams should pick up
added partitions automatically. This can take multiple minutes depending
on your metadata refresh interval (default is 5 minutes).


About rewinding consumer partition offsets:

There is no tool to do this right now. But there is a KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

For now, you could write you own little Java program that manipulate the
offsets before you start your Streams application. However, be aware the
this will result in duplicate processing as there is currently no way to
reset your state stores.


-Matthias




On 2/28/17 1:31 PM, Neil Moore wrote:
> Hello,
>
>
> I have a few questions that I couldn't find answers to in the documentation:
>
>
>   *   Can added partitions be auto-discovered by kafka-streams? In my 
> informal tests I have had to restart the stream nodes.
>   *   Is it possible to rewind the consumer for a particular 
> topic-partitions. e.g. if I have a Processor handling a topic-partition can I 
> rewind that a to an arbitrary offset? I saw that 
> kafka-streams-application-reset allows all partitions to be reset, but I'd 
> like to decide per partition.
>
> If anybody can shed any light on these issues I'd be most grateful.
>
> Thanks.
>



Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

I've done some testing and found that there is a performance issue when
caching is enabled. I suspect his might be what you are hitting. It looks
to me that you can work around this by doing something like:

final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
.withKeys(Serdes.String())
.withValues(mySessionSerde)
.persistent()
.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
.build();


And then in your call to aggregate, pass in the sessionStore created above,
i.e.,

aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS),
mySessionSerde,
sessionStore)


Let us know how you get on.

Thanks,
Damian

On Mon, 6 Mar 2017 at 13:27 Marco Abitabile 
wrote:

> Thanks Damian,
>
> sure, you are right, these details are modified to be compliant with my
> company rules. However the main points are unchanged.
>
> The producer of the original data is a "data ingestor" that attach few
> extra fields and produces a message such as:
>
> row = new JsonObject({
>   "id" : 12345654,
>   "userDeviceId" : "",
>   "creationTime" : 1488801350660 //produced from the remote source
>   "receivedTime": 1488801363455 //placed by my data ingestor,
>   "extra_data1" : 123, //
>   "extra_data2" : 456  // extra data specific for my domain all this data
> are numbers
>   "extra_data2" : 789  //
> })
>
> then it sends records into SOURCE_TOPIC (that in this context is
> USER_ACTIVITIES_TOPIC) as follow:
>
> long creationTimestamp = row.getLong("creationTime");
> long rowId = row.getLong("id");
> ProducerRecord producerRecord = new
> ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> row.toString());
> producer.send(producerRecord);
>
> Noteworthy:
> - I'm using only one partition (right now. I'm still not in production and
> i'm discovering the feature) in production environment I would use more
> partitions
> - the message is a simple string containing json object (i'm not using Avro
> or similar)
>
> - in my streaming application:
>
> public class MySession{
>
> private final JsonObject sessionDetails;
>
> public MySession(){
> this.sessionDetails = new JsonObject();
> }
>
> public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
> JsonObject j) {
> int userId = cache.get(j.get("userDeviceId"));
> return KeyValue.pair(userId, j);
> }
>
> public static MySession aggregate(String key, JsonObject value,
> MySession aggregate) {
> //basically MySession is a collection of all the raw data that the
> session is composed of
> aggregate.addRawData(value);
> return aggregate;
> }
>
> public static MySession merge(String key, MySession arg1, MySession
> arg2)
> {
> arg2.merge(arg1);
> return arg2;
> }
>
> }
>
>
> BTW (this will be a topic for another thread anyway...) is there a way to
> be con control of MySession lifecycle? I was thinking to pool them to
> reduce GC workload.
>
> thanks a lot for your precious help.
>
> Marco
>
> 2017-03-06 11:59 GMT+01:00 Damian Guy :
>
> > Hi Marco,
> >
> > Your config etc look ok.
> >
> > 1. It is pretty hard to tell what is going on from just your code below,
> > unfortunately. But the behaviour doesn't seem to be inline with what I'm
> > reading in the streams code. For example your MySession::new function
> > should be called once per record. The merger and aggregator should be
> > called pretty much immediately after that.
> >
> > 2. Data will be retained for a bit longer than the value used in
> > SessionWindows.until(..). The session store has 3 segments and we use the
> >  retention period (i.e., value of until()) to determine the segment
> length.
> > The segment length is calculated as:
> >
> >  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> >
> > Which in this case is 21 milliseconds. So maintaining 3 segments
> means
> > there could be data that is about 10 minutes old.
> >
> > Also this is completely driven by the data and specifically the time
> > extracted from the data. I'm not sure if you can provide a sample of the
> > data going through the system? It might be helpful in trying to debug the
> > issue. (I'm not seeing anything obvious in the code).
> > Also it might help if you can get some stack traces on the streams
> > instances that appear to be stuck.
> >
> > Thanks,
> > Damian
> > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> > wrote:
> >
> > > Hello,
> > >
> > > I'm playing around with the brand new SessionWindows. I have a simple
> > > topology such as:
> > >
> > > KStream sess =
> > >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > > sess
> > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > > .groupByKey(stringSerde, 

Re: Performance and encryption

2017-03-06 Thread Hans Jespersen

Its not a single message at a time that is encrypted with TLS its the entire 
network byte stream so a Kafka broker can’t even see the Kafka Protocol 
tunneled inside TLS unless it’s terminated at the broker.  
It is true that losing the zero copy optimization impacts performance somewhat  
but it’s not what I would call a “big impact” because Kafka does a lot of other 
things to get it’s performance (like using page cache and doing lots on 
sequential disk I/O). The difference should be something in the order of 25-30% 
slower with TLS enabled which is about what you would see with any other 
messaging protocol with TLS on vs off.

If you wanted to encrypt each message independently before sending to Kafka 
then zero copy would still be in effect and all the consumers would get the 
same encrypted message (and have to understand how to decrypt it).

-hans



> On Mar 6, 2017, at 5:38 AM, Nicolas Motte  wrote:
> 
> Hi everyone,
> 
> I understand one of the reasons why Kafka is performant is by using
> zero-copy.
> 
> I often hear that when encryption is enabled, then Kafka has to copy the
> data in user space to decode the message, so it has a big impact on
> performance.
> 
> If it is true, I don t get why the message has to be decoded by Kafka. I
> would assume that whether the message is encrypted or not, Kafka simply
> receives it, appends it to the file, and when a consumer wants to read it,
> it simply reads at the right offset...
> 
> Also I m wondering if it s the case if we don t use keys (pure queuing
> system with key=null).
> 
> Cheers
> Nico



Re: Performance and encryption

2017-03-06 Thread Todd Palino
It’s not that Kafka has to decode it, it’s that it has to send it across
the network. This is specific to enabling TLS support (transport
encryption), and won’t affect any end-to-end encryption you do at the
client level.

The operation in question is called “zero copy”. In order to send a message
batch to a consumer, the Kafka broker must read it from disk (sometimes
it’s cached in memory, but that’s irrelevant here) and send it across the
network. The Linux kernel allows this to happen without having to copy the
data in memory (to move it from the disk buffers to the network buffers).
However, if TLS is enabled, the broker must first encrypt the data going
across the network. This means that it can no longer take advantage of the
zero copy optimization as it has to make a copy in the process of applying
the TLS encryption.

Now, how much of an impact this has on the broker operations is up for
debate, I think. Originally, when we ran into this problem was when TLS
support was added to Kafka and the zero copy send for plaintext
communications was accidentally removed as well. At the time, we saw a
significant performance hit, and the code was patched to put it back.
However, since then I’ve turned on inter-broker TLS in all of our clusters,
and when we did that there was no performance hit. This is odd, because the
replica fetchers should take advantage of the same zero copy optimization.

It’s possible that it’s because it’s just one consumer (the replica
fetchers). We’re about to start testing additional consumers over TLS, so
we’ll see what happens at that point. All I can suggest right now is that
you test in your environment and see what the impact is. Oh, and using
message keys (or not) won’t matter here.

-Todd


On Mon, Mar 6, 2017 at 5:38 AM, Nicolas Motte  wrote:

> Hi everyone,
>
> I understand one of the reasons why Kafka is performant is by using
> zero-copy.
>
> I often hear that when encryption is enabled, then Kafka has to copy the
> data in user space to decode the message, so it has a big impact on
> performance.
>
> If it is true, I don t get why the message has to be decoded by Kafka. I
> would assume that whether the message is encrypted or not, Kafka simply
> receives it, appends it to the file, and when a consumer wants to read it,
> it simply reads at the right offset...
>
> Also I m wondering if it s the case if we don t use keys (pure queuing
> system with key=null).
>
> Cheers
> Nico
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Hans Jespersen

In that case it’s really one cluster. Make sure to set different rack ids for 
each server room so kafka will ensure that the replicas always span both floors 
and you don’t loose availability of data if a server room goes down.
You will have to configure one addition zookeeper node in each site which you 
will only ever startup if a site goes down because otherwise 2 of 4 zookeeper 
nodes is not a quorum.Again you would be better with 3 nodes because then you 
would only have to do this in the site that has the single active node. 

-hans


> On Mar 6, 2017, at 5:57 AM, Le Cyberian  wrote:
> 
> Hi Hans,
> 
> Thank you for your reply.
> 
> Its basically two different server rooms on different floors and they are
> connected with fiber connectivity so its almost like a local connection
> between them no network latencies / lag.
> 
> If i do a Mirror Maker / Replicator then i will not be able to use them at
> the same time for writes./ producers. because the consumers / producers
> will request from all of them
> 
> BR,
> 
> Lee
> 
> On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen  wrote:
> 
>> What do you mean when you say you have "2 sites not datacenters"? You
>> should be very careful configuring a stretch cluster across multiple sites.
>> What is the RTT between the two sites? Why do you think that MIrror Maker
>> (or Confluent Replicator) would not work between the sites and yet you
>> think a stretch cluster will work? That seems wrong.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian  wrote:
>> 
>>> Hi Guys,
>>> 
>>> Thank you very much for you reply.
>>> 
>>> The scenario which i have to implement is that i have 2 sites not
>>> datacenters so mirror maker would not work here.
>>> 
>>> There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The
>> idea
>>> is to have Active-Active setup along with fault tolerance so that if one
>> of
>>> the site goes on the operations are normal.
>>> 
>>> In this case if i go ahead with 4 node-cluster of both zookeeper and
>> kafka
>>> it will give failover tolerance for 1 node only.
>>> 
>>> What do you suggest to do in this case ? because to divide between 2
>> sites
>>> it needs to be even number if that makes sense ? Also if possible some
>> help
>>> regarding partitions for topic and replication factor.
>>> 
>>> I already have Kafka running with quiet few topics having replication
>>> factor 1 along with 1 default partition, is there a way to repartition /
>>> increase partition of existing topics when i migrate to above setup ? I
>>> think we can increase replication factor by Kafka rebalance tool.
>>> 
>>> Thanks alot for your help and time looking into this.
>>> 
>>> BR,
>>> 
>>> Le
>>> 
>>> On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen 
>> wrote:
>>> 
 Jens,
 
 I think you are correct that a 4 node zookeeper ensemble can be made to
 work but it will be slightly less resilient than a 3 node ensemble
>>> because
 it can only tolerate 1 failure (same as a 3 node ensemble) and the
 likelihood of node failures is higher because there is 1 more node that
 could fail.
 So it SHOULD be an odd number of zookeeper nodes (not MUST).
 
 -hans
 
 
> On Mar 6, 2017, at 12:20 AM, Jens Rantil 
>> wrote:
> 
> Hi Hans,
> 
>> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
 wrote:
>> 
>> A 4 node zookeeper ensemble will not even work. It MUST be an odd
>>> number
>> of zookeeper nodes to start.
> 
> 
> Are you sure about that? If Zookeer doesn't run with four nodes, that
 means
> a running ensemble of three can't be live-migrated to other nodes
 (because
> that's done by increasing the ensemble and then reducing it in the
>> case
 of
> 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that
>>> means
> quorum will be three nodes, so there's no added benefit in terms of
> availability since you can only loose one node just like with a three
 node
> cluster.
> 
> Cheers,
> Jens
> 
> 
> --
> Jens Rantil
> Backend engineer
> Tink AB
> 
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
> 
> Facebook  Linkedin
> 
> Twitter 
 
>>> 
>> 



Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Hi Hans,

Thank you for your reply.

Its basically two different server rooms on different floors and they are
connected with fiber connectivity so its almost like a local connection
between them no network latencies / lag.

If i do a Mirror Maker / Replicator then i will not be able to use them at
the same time for writes./ producers. because the consumers / producers
will request from all of them.

I am confused somehow :-/ what to do in this case

BR,

Lee

On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen  wrote:

> What do you mean when you say you have "2 sites not datacenters"? You
> should be very careful configuring a stretch cluster across multiple sites.
> What is the RTT between the two sites? Why do you think that MIrror Maker
> (or Confluent Replicator) would not work between the sites and yet you
> think a stretch cluster will work? That seems wrong.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian  wrote:
>
> > Hi Guys,
> >
> > Thank you very much for you reply.
> >
> > The scenario which i have to implement is that i have 2 sites not
> > datacenters so mirror maker would not work here.
> >
> > There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The
> idea
> > is to have Active-Active setup along with fault tolerance so that if one
> of
> > the site goes on the operations are normal.
> >
> > In this case if i go ahead with 4 node-cluster of both zookeeper and
> kafka
> > it will give failover tolerance for 1 node only.
> >
> > What do you suggest to do in this case ? because to divide between 2
> sites
> > it needs to be even number if that makes sense ? Also if possible some
> help
> > regarding partitions for topic and replication factor.
> >
> > I already have Kafka running with quiet few topics having replication
> > factor 1 along with 1 default partition, is there a way to repartition /
> > increase partition of existing topics when i migrate to above setup ? I
> > think we can increase replication factor by Kafka rebalance tool.
> >
> > Thanks alot for your help and time looking into this.
> >
> > BR,
> >
> > Le
> >
> > On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen 
> wrote:
> >
> > > Jens,
> > >
> > > I think you are correct that a 4 node zookeeper ensemble can be made to
> > > work but it will be slightly less resilient than a 3 node ensemble
> > because
> > > it can only tolerate 1 failure (same as a 3 node ensemble) and the
> > > likelihood of node failures is higher because there is 1 more node that
> > > could fail.
> > > So it SHOULD be an odd number of zookeeper nodes (not MUST).
> > >
> > > -hans
> > >
> > >
> > > > On Mar 6, 2017, at 12:20 AM, Jens Rantil 
> wrote:
> > > >
> > > > Hi Hans,
> > > >
> > > >> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
> > > wrote:
> > > >>
> > > >> A 4 node zookeeper ensemble will not even work. It MUST be an odd
> > number
> > > >> of zookeeper nodes to start.
> > > >
> > > >
> > > > Are you sure about that? If Zookeer doesn't run with four nodes, that
> > > means
> > > > a running ensemble of three can't be live-migrated to other nodes
> > > (because
> > > > that's done by increasing the ensemble and then reducing it in the
> case
> > > of
> > > > 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that
> > means
> > > > quorum will be three nodes, so there's no added benefit in terms of
> > > > availability since you can only loose one node just like with a three
> > > node
> > > > cluster.
> > > >
> > > > Cheers,
> > > > Jens
> > > >
> > > >
> > > > --
> > > > Jens Rantil
> > > > Backend engineer
> > > > Tink AB
> > > >
> > > > Email: jens.ran...@tink.se
> > > > Phone: +46 708 84 18 32
> > > > Web: www.tink.se
> > > >
> > > > Facebook  Linkedin
> > > >  > > companies_res_photo=VSRPsearchId%3A1057023381369207406670%
> > > 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
> > > > Twitter 
> > >
> >
>


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Hi Hans,

Thank you for your reply.

Its basically two different server rooms on different floors and they are
connected with fiber connectivity so its almost like a local connection
between them no network latencies / lag.

If i do a Mirror Maker / Replicator then i will not be able to use them at
the same time for writes./ producers. because the consumers / producers
will request from all of them

BR,

Lee

On Mon, Mar 6, 2017 at 2:50 PM, Hans Jespersen  wrote:

> What do you mean when you say you have "2 sites not datacenters"? You
> should be very careful configuring a stretch cluster across multiple sites.
> What is the RTT between the two sites? Why do you think that MIrror Maker
> (or Confluent Replicator) would not work between the sites and yet you
> think a stretch cluster will work? That seems wrong.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian  wrote:
>
> > Hi Guys,
> >
> > Thank you very much for you reply.
> >
> > The scenario which i have to implement is that i have 2 sites not
> > datacenters so mirror maker would not work here.
> >
> > There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The
> idea
> > is to have Active-Active setup along with fault tolerance so that if one
> of
> > the site goes on the operations are normal.
> >
> > In this case if i go ahead with 4 node-cluster of both zookeeper and
> kafka
> > it will give failover tolerance for 1 node only.
> >
> > What do you suggest to do in this case ? because to divide between 2
> sites
> > it needs to be even number if that makes sense ? Also if possible some
> help
> > regarding partitions for topic and replication factor.
> >
> > I already have Kafka running with quiet few topics having replication
> > factor 1 along with 1 default partition, is there a way to repartition /
> > increase partition of existing topics when i migrate to above setup ? I
> > think we can increase replication factor by Kafka rebalance tool.
> >
> > Thanks alot for your help and time looking into this.
> >
> > BR,
> >
> > Le
> >
> > On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen 
> wrote:
> >
> > > Jens,
> > >
> > > I think you are correct that a 4 node zookeeper ensemble can be made to
> > > work but it will be slightly less resilient than a 3 node ensemble
> > because
> > > it can only tolerate 1 failure (same as a 3 node ensemble) and the
> > > likelihood of node failures is higher because there is 1 more node that
> > > could fail.
> > > So it SHOULD be an odd number of zookeeper nodes (not MUST).
> > >
> > > -hans
> > >
> > >
> > > > On Mar 6, 2017, at 12:20 AM, Jens Rantil 
> wrote:
> > > >
> > > > Hi Hans,
> > > >
> > > >> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
> > > wrote:
> > > >>
> > > >> A 4 node zookeeper ensemble will not even work. It MUST be an odd
> > number
> > > >> of zookeeper nodes to start.
> > > >
> > > >
> > > > Are you sure about that? If Zookeer doesn't run with four nodes, that
> > > means
> > > > a running ensemble of three can't be live-migrated to other nodes
> > > (because
> > > > that's done by increasing the ensemble and then reducing it in the
> case
> > > of
> > > > 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that
> > means
> > > > quorum will be three nodes, so there's no added benefit in terms of
> > > > availability since you can only loose one node just like with a three
> > > node
> > > > cluster.
> > > >
> > > > Cheers,
> > > > Jens
> > > >
> > > >
> > > > --
> > > > Jens Rantil
> > > > Backend engineer
> > > > Tink AB
> > > >
> > > > Email: jens.ran...@tink.se
> > > > Phone: +46 708 84 18 32
> > > > Web: www.tink.se
> > > >
> > > > Facebook  Linkedin
> > > >  > > companies_res_photo=VSRPsearchId%3A1057023381369207406670%
> > > 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
> > > > Twitter 
> > >
> >
>


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Hans Jespersen
What do you mean when you say you have "2 sites not datacenters"? You
should be very careful configuring a stretch cluster across multiple sites.
What is the RTT between the two sites? Why do you think that MIrror Maker
(or Confluent Replicator) would not work between the sites and yet you
think a stretch cluster will work? That seems wrong.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Mon, Mar 6, 2017 at 5:37 AM, Le Cyberian  wrote:

> Hi Guys,
>
> Thank you very much for you reply.
>
> The scenario which i have to implement is that i have 2 sites not
> datacenters so mirror maker would not work here.
>
> There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The idea
> is to have Active-Active setup along with fault tolerance so that if one of
> the site goes on the operations are normal.
>
> In this case if i go ahead with 4 node-cluster of both zookeeper and kafka
> it will give failover tolerance for 1 node only.
>
> What do you suggest to do in this case ? because to divide between 2 sites
> it needs to be even number if that makes sense ? Also if possible some help
> regarding partitions for topic and replication factor.
>
> I already have Kafka running with quiet few topics having replication
> factor 1 along with 1 default partition, is there a way to repartition /
> increase partition of existing topics when i migrate to above setup ? I
> think we can increase replication factor by Kafka rebalance tool.
>
> Thanks alot for your help and time looking into this.
>
> BR,
>
> Le
>
> On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen  wrote:
>
> > Jens,
> >
> > I think you are correct that a 4 node zookeeper ensemble can be made to
> > work but it will be slightly less resilient than a 3 node ensemble
> because
> > it can only tolerate 1 failure (same as a 3 node ensemble) and the
> > likelihood of node failures is higher because there is 1 more node that
> > could fail.
> > So it SHOULD be an odd number of zookeeper nodes (not MUST).
> >
> > -hans
> >
> >
> > > On Mar 6, 2017, at 12:20 AM, Jens Rantil  wrote:
> > >
> > > Hi Hans,
> > >
> > >> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
> > wrote:
> > >>
> > >> A 4 node zookeeper ensemble will not even work. It MUST be an odd
> number
> > >> of zookeeper nodes to start.
> > >
> > >
> > > Are you sure about that? If Zookeer doesn't run with four nodes, that
> > means
> > > a running ensemble of three can't be live-migrated to other nodes
> > (because
> > > that's done by increasing the ensemble and then reducing it in the case
> > of
> > > 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that
> means
> > > quorum will be three nodes, so there's no added benefit in terms of
> > > availability since you can only loose one node just like with a three
> > node
> > > cluster.
> > >
> > > Cheers,
> > > Jens
> > >
> > >
> > > --
> > > Jens Rantil
> > > Backend engineer
> > > Tink AB
> > >
> > > Email: jens.ran...@tink.se
> > > Phone: +46 708 84 18 32
> > > Web: www.tink.se
> > >
> > > Facebook  Linkedin
> > >  > companies_res_photo=VSRPsearchId%3A1057023381369207406670%
> > 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
> > > Twitter 
> >
>


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Hi Guys,

Thank you very much for you reply.

The scenario which i have to implement is that i have 2 sites not
datacenters so mirror maker would not work here.

There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The idea
is to have Active-Active setup along with fault tolerance so that if one of
the site goes on the operations are normal.

In this case if i go ahead with 4 node-cluster of both zookeeper and kafka
it will give failover tolerance for 1 node only.

What do you suggest to do in this case ? because to divide between 2 sites
it needs to be even number if that makes sense ? Also if possible some help
regarding partitions for topic and replication factor.

I already have Kafka running with quiet few topics having replication
factor 1 along with 1 default partition, is there a way to repartition /
increase partition of existing topics when i migrate to above setup ? I
think we can increase replication factor by Kafka rebalance tool.

Thanks alot for your help and time looking into this.

BR,

Le

On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen  wrote:

> Jens,
>
> I think you are correct that a 4 node zookeeper ensemble can be made to
> work but it will be slightly less resilient than a 3 node ensemble because
> it can only tolerate 1 failure (same as a 3 node ensemble) and the
> likelihood of node failures is higher because there is 1 more node that
> could fail.
> So it SHOULD be an odd number of zookeeper nodes (not MUST).
>
> -hans
>
>
> > On Mar 6, 2017, at 12:20 AM, Jens Rantil  wrote:
> >
> > Hi Hans,
> >
> >> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
> wrote:
> >>
> >> A 4 node zookeeper ensemble will not even work. It MUST be an odd number
> >> of zookeeper nodes to start.
> >
> >
> > Are you sure about that? If Zookeer doesn't run with four nodes, that
> means
> > a running ensemble of three can't be live-migrated to other nodes
> (because
> > that's done by increasing the ensemble and then reducing it in the case
> of
> > 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that means
> > quorum will be three nodes, so there's no added benefit in terms of
> > availability since you can only loose one node just like with a three
> node
> > cluster.
> >
> > Cheers,
> > Jens
> >
> >
> > --
> > Jens Rantil
> > Backend engineer
> > Tink AB
> >
> > Email: jens.ran...@tink.se
> > Phone: +46 708 84 18 32
> > Web: www.tink.se
> >
> > Facebook  Linkedin
> >  companies_res_photo=VSRPsearchId%3A1057023381369207406670%
> 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
> > Twitter 
>


Performance and encryption

2017-03-06 Thread Nicolas Motte
Hi everyone,

I understand one of the reasons why Kafka is performant is by using
zero-copy.

I often hear that when encryption is enabled, then Kafka has to copy the
data in user space to decode the message, so it has a big impact on
performance.

If it is true, I don t get why the message has to be decoded by Kafka. I
would assume that whether the message is encrypted or not, Kafka simply
receives it, appends it to the file, and when a consumer wants to read it,
it simply reads at the right offset...

Also I m wondering if it s the case if we don t use keys (pure queuing
system with key=null).

Cheers
Nico


Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Le Cyberian
Hi Guys,

Thank you very much for you reply.

The scenario which i have to implement is that i have 2 sites not
datacenters so mirror maker would not work here.

There will be 4 nodes in total, like 2 in Site A and 2 in Site B. The idea
is to have Active-Active setup along with fault tolerance so that if one of
the site goes on the operations are normal.

In this case if i go ahead with 4 node-cluster of both zookeeper and kafka
it will give failover tolerance for 1 node only.

What do you suggest to do in this case ? because to divide between 2 sites
it needs to be even number if that makes sense ? Also if possible some help
regarding partitions for topic and replication factor.

I already have Kafka running with quiet few topics having replication
factor 1 along with 1 default partition, is there a way to repartition /
increase partition of existing topics when i migrate to above setup ? I
think we can increase replication factor by Kafka rebalance tool.

Thanks alot for your help and time looking into this.

BR,

Le

On Mon, Mar 6, 2017 at 12:20 PM, Hans Jespersen  wrote:

> Jens,
>
> I think you are correct that a 4 node zookeeper ensemble can be made to
> work but it will be slightly less resilient than a 3 node ensemble because
> it can only tolerate 1 failure (same as a 3 node ensemble) and the
> likelihood of node failures is higher because there is 1 more node that
> could fail.
> So it SHOULD be an odd number of zookeeper nodes (not MUST).
>
> -hans
>
>
> > On Mar 6, 2017, at 12:20 AM, Jens Rantil  wrote:
> >
> > Hi Hans,
> >
> >> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen 
> wrote:
> >>
> >> A 4 node zookeeper ensemble will not even work. It MUST be an odd number
> >> of zookeeper nodes to start.
> >
> >
> > Are you sure about that? If Zookeer doesn't run with four nodes, that
> means
> > a running ensemble of three can't be live-migrated to other nodes
> (because
> > that's done by increasing the ensemble and then reducing it in the case
> of
> > 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that means
> > quorum will be three nodes, so there's no added benefit in terms of
> > availability since you can only loose one node just like with a three
> node
> > cluster.
> >
> > Cheers,
> > Jens
> >
> >
> > --
> > Jens Rantil
> > Backend engineer
> > Tink AB
> >
> > Email: jens.ran...@tink.se
> > Phone: +46 708 84 18 32
> > Web: www.tink.se
> >
> > Facebook  Linkedin
> >  companies_res_photo=VSRPsearchId%3A1057023381369207406670%
> 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
> > Twitter 
>


Re: Strange behaviour in Session Windows

2017-03-06 Thread Marco Abitabile
Thanks Damian,

sure, you are right, these details are modified to be compliant with my
company rules. However the main points are unchanged.

The producer of the original data is a "data ingestor" that attach few
extra fields and produces a message such as:

row = new JsonObject({
  "id" : 12345654,
  "userDeviceId" : "",
  "creationTime" : 1488801350660 //produced from the remote source
  "receivedTime": 1488801363455 //placed by my data ingestor,
  "extra_data1" : 123, //
  "extra_data2" : 456  // extra data specific for my domain all this data
are numbers
  "extra_data2" : 789  //
})

then it sends records into SOURCE_TOPIC (that in this context is
USER_ACTIVITIES_TOPIC) as follow:

long creationTimestamp = row.getLong("creationTime");
long rowId = row.getLong("id");
ProducerRecord producerRecord = new
ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
row.toString());
producer.send(producerRecord);

Noteworthy:
- I'm using only one partition (right now. I'm still not in production and
i'm discovering the feature) in production environment I would use more
partitions
- the message is a simple string containing json object (i'm not using Avro
or similar)

- in my streaming application:

public class MySession{

private final JsonObject sessionDetails;

public MySession(){
this.sessionDetails = new JsonObject();
}

public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
JsonObject j) {
int userId = cache.get(j.get("userDeviceId"));
return KeyValue.pair(userId, j);
}

public static MySession aggregate(String key, JsonObject value,
MySession aggregate) {
//basically MySession is a collection of all the raw data that the
session is composed of
aggregate.addRawData(value);
return aggregate;
}

public static MySession merge(String key, MySession arg1, MySession arg2)
{
arg2.merge(arg1);
return arg2;
}

}


BTW (this will be a topic for another thread anyway...) is there a way to
be con control of MySession lifecycle? I was thinking to pool them to
reduce GC workload.

thanks a lot for your precious help.

Marco

2017-03-06 11:59 GMT+01:00 Damian Guy :

> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
>
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
>
> Which in this case is 21 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
>
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
>
> Thanks,
> Damian
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> wrote:
>
> > Hello,
> >
> > I'm playing around with the brand new SessionWindows. I have a simple
> > topology such as:
> >
> > KStream sess =
> >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > sess
> > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > .groupByKey(stringSerde, jsonSerde)
> > .aggregate(
> > MySession::new,
> > MySession::aggregateSessions,
> > MySession::mergeSessions,
> > SessionWindows
> > .with(WINDOW_INACTIVITY_GAPS_MS)
> > .until(WINDOW_MAINTAIN_DURATION_MS),
> > .filter(MySession::filterOutZeroLenghtSessions)
> > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
> >
> > these are the most important configuration I'm using, all the other
> configs
> > are the classical serdes and hosts props:
> >
> > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> > 2_MINUTES;
> >
> > private static final Properties props = new Properties();
> >
> > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_
> CONFIG,
> > ONE_DAY);
> >
> > The source stream has data arriving at around 100 messages/second
> >
> > I'm experiencing this behaviours:
> >
> > 1) MySession::new is called thousands of times, way way more of the
> number
> > of messages ingested 

Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and
see if that resolves the issue?

Thanks,
Damian


On Mon, 6 Mar 2017 at 10:59 Damian Guy  wrote:

> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
>
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
>
> Which in this case is 21 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
>
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> wrote:
>
> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
> .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> .groupByKey(stringSerde, jsonSerde)
> .aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS)
> .until(WINDOW_MAINTAIN_DURATION_MS),
> .filter(MySession::filterOutZeroLenghtSessions)
> .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>- every second that passes the pipeline gets slower and slower and
>- I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>- the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>
>


Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Damian Guy
Hi Sachin,

If it is a bug then please file a JIRA for it, too.

Thanks,
Damian

On Mon, 6 Mar 2017 at 11:23 Sachin Mittal  wrote:

> Ok that's great.
> So you have already fixed that issue.
>
> I have modified my PR to remove that change (which was done keeping
> 0.10.2.0 in mind).
>
> However the other issue is still valid.
>
> Please review that change. https://github.com/apache/kafka/pull/2642
>
>
> Thanks
> Sachin
>
>
> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy  wrote:
>
> > On trunk the CommitFailedException isn't thrown anymore. The
> commitOffsets
> > method doesn't throw an exception. It returns one if it was thrown. We
> used
> > to throw this exception during suspendTasksAndState, but we don't
> anymore.
> >
> > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal  wrote:
> >
> > > Hi
> > > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > > assigned to rebalanceException.
> > > This causes the stream thread to shutdown. I am not sure how we can
> > resume
> > > the thread.
> > >
> > > Note thread is not in invalid state because because it already has been
> > > assigned new partitions and this exception happens when trying to
> revoke
> > > old partitions which have been moved to some other thread, so we need
> to
> > > swallow this exception at the StreanThread side too, just like we
> swallow
> > > it at ConsumerCoordinator.java
> > >
> > > Also I fixed this against code base 0.10.2.0 and the difference in that
> > vs
> > > trunk code is these lines
> > > 10.2.0
> > >if (firstException.get() == null) {
> > > firstException.set(commitOffsets());
> > >}
> > >  vs trunk
> > > if (firstException.get() == null) {
> > > // TODO: currently commit failures will not be thrown to
> > users
> > > // while suspending tasks; this need to be re-visit after
> > > KIP-98
> > > commitOffsets();
> > > }
> > > I am again not sure since this part is still a TODO, but looking at
> code
> > I
> > > see that commitOffsets can still throw the CommitFailedException which
> > > needs to be handled at onPartitionsRevoked.
> > >
> > > Hope this makes sense.
> > >
> > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> but
> > > after fixing the deadlock we need to make sure thread does not die due
> to
> > > unhandled CommitFailedException at onPartitionsRevoked.
> > > The deadlock issue is like this.
> > > If a thread has two partitions and while processing partition one it
> > takes
> > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > > from the group and both partitions are now migrated to some other
> thread.
> > > Now when it tries to process the partition two it tries to get the lock
> > to
> > > rocks db. It won't get the lock since that partition is now moved to
> some
> > > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> > to
> > > get the lock forever. This reaching a deadlock.
> > > To fix this we need some upper bound of the time limit till it tries to
> > get
> > > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > > because if by that time it has not got the lock, we can see that this
> > > thread was evicted from the group and need to rejoin again to get new
> > > partitions.
> > >
> > > On JIRA issue I can create one and attach the part of logs where it
> keeps
> > > trying to get the lock with increasing backoffTimeM.
> > >
> > > Let me know if these makes sense. Right now this is the best way we
> could
> > > come up with to handle stream thread failures.
> > >
> > > Also on a side note I feel we need more resilient streams. If we have
> say
> > > configured our streams application with 4 threads and for whatever
> > reason a
> > > thread dies, then application should itself (or via some exposed
> hooks),
> > > allow to restart a new thread (because in Java I guess same thread
> cannot
> > > be restarted), so that number of threads always stay what one has
> > > configured.
> > > I think exposed hooks will be better option to do this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax  >
> > > wrote:
> > >
> > > > Sachin,
> > > >
> > > > thanks a lot for contributing!
> > > >
> > > > Right now, I am not sure if I understand the change. On
> > > > CommitFailedException, why can we just resume the thread? To me, it
> > > > seems that the thread will be in an invalid state and thus it's not
> > save
> > > > to just swallow the exception and keep going. Can you shed some
> light?
> > > >
> > > > And from my understanding, the deadlock is "caused" by the change
> from
> > > > above, right? So if it is save to swallow the exception, we should do
> > > > some "clean up" to avoid the deadlock in the first place, instead of
> > > > applying and additional timeout.
> > > >
> > 

Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Sachin Mittal
Ok that's great.
So you have already fixed that issue.

I have modified my PR to remove that change (which was done keeping
0.10.2.0 in mind).

However the other issue is still valid.

Please review that change. https://github.com/apache/kafka/pull/2642


Thanks
Sachin


On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy  wrote:

> On trunk the CommitFailedException isn't thrown anymore. The commitOffsets
> method doesn't throw an exception. It returns one if it was thrown. We used
> to throw this exception during suspendTasksAndState, but we don't anymore.
>
> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal  wrote:
>
> > Hi
> > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > assigned to rebalanceException.
> > This causes the stream thread to shutdown. I am not sure how we can
> resume
> > the thread.
> >
> > Note thread is not in invalid state because because it already has been
> > assigned new partitions and this exception happens when trying to revoke
> > old partitions which have been moved to some other thread, so we need to
> > swallow this exception at the StreanThread side too, just like we swallow
> > it at ConsumerCoordinator.java
> >
> > Also I fixed this against code base 0.10.2.0 and the difference in that
> vs
> > trunk code is these lines
> > 10.2.0
> >if (firstException.get() == null) {
> > firstException.set(commitOffsets());
> >}
> >  vs trunk
> > if (firstException.get() == null) {
> > // TODO: currently commit failures will not be thrown to
> users
> > // while suspending tasks; this need to be re-visit after
> > KIP-98
> > commitOffsets();
> > }
> > I am again not sure since this part is still a TODO, but looking at code
> I
> > see that commitOffsets can still throw the CommitFailedException which
> > needs to be handled at onPartitionsRevoked.
> >
> > Hope this makes sense.
> >
> > On second issue, the deadlock is not caused by CommitFailedExceptio, but
> > after fixing the deadlock we need to make sure thread does not die due to
> > unhandled CommitFailedException at onPartitionsRevoked.
> > The deadlock issue is like this.
> > If a thread has two partitions and while processing partition one it
> takes
> > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > from the group and both partitions are now migrated to some other thread.
> > Now when it tries to process the partition two it tries to get the lock
> to
> > rocks db. It won't get the lock since that partition is now moved to some
> > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> to
> > get the lock forever. This reaching a deadlock.
> > To fix this we need some upper bound of the time limit till it tries to
> get
> > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > because if by that time it has not got the lock, we can see that this
> > thread was evicted from the group and need to rejoin again to get new
> > partitions.
> >
> > On JIRA issue I can create one and attach the part of logs where it keeps
> > trying to get the lock with increasing backoffTimeM.
> >
> > Let me know if these makes sense. Right now this is the best way we could
> > come up with to handle stream thread failures.
> >
> > Also on a side note I feel we need more resilient streams. If we have say
> > configured our streams application with 4 threads and for whatever
> reason a
> > thread dies, then application should itself (or via some exposed hooks),
> > allow to restart a new thread (because in Java I guess same thread cannot
> > be restarted), so that number of threads always stay what one has
> > configured.
> > I think exposed hooks will be better option to do this.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax 
> > wrote:
> >
> > > Sachin,
> > >
> > > thanks a lot for contributing!
> > >
> > > Right now, I am not sure if I understand the change. On
> > > CommitFailedException, why can we just resume the thread? To me, it
> > > seems that the thread will be in an invalid state and thus it's not
> save
> > > to just swallow the exception and keep going. Can you shed some light?
> > >
> > > And from my understanding, the deadlock is "caused" by the change from
> > > above, right? So if it is save to swallow the exception, we should do
> > > some "clean up" to avoid the deadlock in the first place, instead of
> > > applying and additional timeout.
> > >
> > > Also, if this is a bug, we should have a JIRA.
> > >
> > > -Matthias
> > >
> > >
> > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > Hi,
> > > > Please find the new PR
> > > > https://github.com/apache/kafka/pull/2642/
> > > >
> > > > I see that in trunk there has been change which is different from in
> > > 10.2.0
> > > >
> > > > 10.2.0
> > > >if (firstException.get() == null) {
> > > > 

Re: Having 4 Node Kafka Cluster

2017-03-06 Thread Hans Jespersen
Jens,

I think you are correct that a 4 node zookeeper ensemble can be made to work 
but it will be slightly less resilient than a 3 node ensemble because it can 
only tolerate 1 failure (same as a 3 node ensemble) and the likelihood of node 
failures is higher because there is 1 more node that could fail.
So it SHOULD be an odd number of zookeeper nodes (not MUST).

-hans


> On Mar 6, 2017, at 12:20 AM, Jens Rantil  wrote:
> 
> Hi Hans,
> 
>> On Mon, Mar 6, 2017 at 12:10 AM, Hans Jespersen  wrote:
>> 
>> A 4 node zookeeper ensemble will not even work. It MUST be an odd number
>> of zookeeper nodes to start.
> 
> 
> Are you sure about that? If Zookeer doesn't run with four nodes, that means
> a running ensemble of three can't be live-migrated to other nodes (because
> that's done by increasing the ensemble and then reducing it in the case of
> 3-node ensembles). IIRC, you can run four Zookeeper nodes, but that means
> quorum will be three nodes, so there's no added benefit in terms of
> availability since you can only loose one node just like with a three node
> cluster.
> 
> Cheers,
> Jens
> 
> 
> -- 
> Jens Rantil
> Backend engineer
> Tink AB
> 
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
> 
> Facebook  Linkedin
> 
> Twitter 


Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

Your config etc look ok.

1. It is pretty hard to tell what is going on from just your code below,
unfortunately. But the behaviour doesn't seem to be inline with what I'm
reading in the streams code. For example your MySession::new function
should be called once per record. The merger and aggregator should be
called pretty much immediately after that.

2. Data will be retained for a bit longer than the value used in
SessionWindows.until(..). The session store has 3 segments and we use the
 retention period (i.e., value of until()) to determine the segment length.
The segment length is calculated as:

 Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);

Which in this case is 21 milliseconds. So maintaining 3 segments means
there could be data that is about 10 minutes old.

Also this is completely driven by the data and specifically the time
extracted from the data. I'm not sure if you can provide a sample of the
data going through the system? It might be helpful in trying to debug the
issue. (I'm not seeing anything obvious in the code).
Also it might help if you can get some stack traces on the streams
instances that appear to be stuck.

Thanks,
Damian
On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
wrote:

> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
> .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> .groupByKey(stringSerde, jsonSerde)
> .aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS)
> .until(WINDOW_MAINTAIN_DURATION_MS),
> .filter(MySession::filterOutZeroLenghtSessions)
> .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>- every second that passes the pipeline gets slower and slower and
>- I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>- the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>


Re: Fixing two critical bugs in kafka streams

2017-03-06 Thread Damian Guy
On trunk the CommitFailedException isn't thrown anymore. The commitOffsets
method doesn't throw an exception. It returns one if it was thrown. We used
to throw this exception during suspendTasksAndState, but we don't anymore.

On Mon, 6 Mar 2017 at 05:04 Sachin Mittal  wrote:

> Hi
> On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> assigned to rebalanceException.
> This causes the stream thread to shutdown. I am not sure how we can resume
> the thread.
>
> Note thread is not in invalid state because because it already has been
> assigned new partitions and this exception happens when trying to revoke
> old partitions which have been moved to some other thread, so we need to
> swallow this exception at the StreanThread side too, just like we swallow
> it at ConsumerCoordinator.java
>
> Also I fixed this against code base 0.10.2.0 and the difference in that vs
> trunk code is these lines
> 10.2.0
>if (firstException.get() == null) {
> firstException.set(commitOffsets());
>}
>  vs trunk
> if (firstException.get() == null) {
> // TODO: currently commit failures will not be thrown to users
> // while suspending tasks; this need to be re-visit after
> KIP-98
> commitOffsets();
> }
> I am again not sure since this part is still a TODO, but looking at code I
> see that commitOffsets can still throw the CommitFailedException which
> needs to be handled at onPartitionsRevoked.
>
> Hope this makes sense.
>
> On second issue, the deadlock is not caused by CommitFailedExceptio, but
> after fixing the deadlock we need to make sure thread does not die due to
> unhandled CommitFailedException at onPartitionsRevoked.
> The deadlock issue is like this.
> If a thread has two partitions and while processing partition one it takes
> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> from the group and both partitions are now migrated to some other thread.
> Now when it tries to process the partition two it tries to get the lock to
> rocks db. It won't get the lock since that partition is now moved to some
> other thread. So it keeps increasing the backoffTimeMs and keeps trying to
> get the lock forever. This reaching a deadlock.
> To fix this we need some upper bound of the time limit till it tries to get
> that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> because if by that time it has not got the lock, we can see that this
> thread was evicted from the group and need to rejoin again to get new
> partitions.
>
> On JIRA issue I can create one and attach the part of logs where it keeps
> trying to get the lock with increasing backoffTimeM.
>
> Let me know if these makes sense. Right now this is the best way we could
> come up with to handle stream thread failures.
>
> Also on a side note I feel we need more resilient streams. If we have say
> configured our streams application with 4 threads and for whatever reason a
> thread dies, then application should itself (or via some exposed hooks),
> allow to restart a new thread (because in Java I guess same thread cannot
> be restarted), so that number of threads always stay what one has
> configured.
> I think exposed hooks will be better option to do this.
>
> Thanks
> Sachin
>
>
>
>
> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax 
> wrote:
>
> > Sachin,
> >
> > thanks a lot for contributing!
> >
> > Right now, I am not sure if I understand the change. On
> > CommitFailedException, why can we just resume the thread? To me, it
> > seems that the thread will be in an invalid state and thus it's not save
> > to just swallow the exception and keep going. Can you shed some light?
> >
> > And from my understanding, the deadlock is "caused" by the change from
> > above, right? So if it is save to swallow the exception, we should do
> > some "clean up" to avoid the deadlock in the first place, instead of
> > applying and additional timeout.
> >
> > Also, if this is a bug, we should have a JIRA.
> >
> > -Matthias
> >
> >
> > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > Hi,
> > > Please find the new PR
> > > https://github.com/apache/kafka/pull/2642/
> > >
> > > I see that in trunk there has been change which is different from in
> > 10.2.0
> > >
> > > 10.2.0
> > >if (firstException.get() == null) {
> > > firstException.set(commitOffsets());
> > >}
> > >  vs trunk
> > > if (firstException.get() == null) {
> > > // TODO: currently commit failures will not be thrown to
> > users
> > > // while suspending tasks; this need to be re-visit after
> > KIP-98
> > > commitOffsets();
> > > }
> > > I am not sure in view of this is is my part of the fix still valid.
> Looks
> > > like it is still valid.
> > >
> > > Also on side note what is the policy of closing a branch that is just
> > > released.
> > >
> > > Since you have 

Re: Kafka Streams - ordering grouped messages

2017-03-06 Thread Damian Guy
Hi Ofir,

My advice it to handle the duplicates. As you said compaction only runs on
the non-active segments. There could be duplicates in the active segment.
Further, even after compaction has run there could still be duplicates.
You can attempt to minimize the occurrence of duplicates by adjusting the
segment size of the topic(s) in question. If you have a smaller segment
size then compaction will get a chance to run more frequently, however this
also means you'll have more files.

Thanks,
Damian

On Sun, 5 Mar 2017 at 10:24 Ofir Sharony 
wrote:

> Thanks guys,
>
> I would like to continue where we stopped (late arriving records):
>
> As I understand, the best practice to handle late arriving records is
> enabling Kafka log compaction, thus keeping only the latest record of a
> certain key.
> As log compaction starts to do its magic only on non-active segments, I'm
> trying to understand what's the best approach in case I want to send my
> data downstream in real time.
>
> Would you advise to plan my downstream apps to handle these key
> duplications, or there's any way to remove them in real time or close to it
> (let's say up to 1 minute)?
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.shar...@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> 
>
> 
>  
> 
>
>
> On Wed, Mar 1, 2017 at 7:44 PM, Matthias J. Sax 
> wrote:
>
> > Just wanted to add, that there is always the potential about late
> > arriving records, and thus, ordering by timestamp will never be
> perfect...
> >
> > You should rather try to design you application in a way such that it
> > can handle out-of-order data gracefully and try to avoid the necessity
> > of ordering records by timestamp.
> >
> >
> > -Matthias
> >
> > On 3/1/17 7:31 AM, Damian Guy wrote:
> > > You could implement your own based sorting algorithm using the low
> level
> > > processor api, i.e, you have a processor that keeps a sorted list of
> > > records and then, periodically, perhaps on punctuate, it emits the
> sorted
> > > messages downstream. You could do something like:
> > >
> > > builder.stream("topic").transform(new TransformerSupplier() {
> > >
> > > @Override
> > > public Transformer get() {
> > > return new TheTransformer();
> > > }
> > > }).groupByKey().reduce(..);
> > >
> > > Where the TheTransformer might look something like:
> > >
> > > private static class TheTransformer implements Transformer > V, R> {
> > > private ProcessorContext context;
> > > private TreeMap sorted = new TreeMap<>();
> > >
> > > @Override
> > > public void init(final ProcessorContext context) {
> > > this.context = context;
> > > context.schedule(1000); // punctuate every 1 second of
> > streams-time
> > > }
> > >
> > > @Override
> > > public R transform(final K key, final V value) {
> > > // do stuff
> > > sorted.put(key, value);
> > > }
> > >
> > > @Override
> > > public R punctuate(final long timestamp) {
> > > for (final Map.Entry kvEntry : sorted.entrySet()) {
> > > context.forward(kvEntry.getKey(), kvEntry.getValue());
> > > }
> > > sorted.clear();
> > > return null;
> > > }
> > >
> > > @Override
> > > public void close() {
> > >
> > > }
> > > }
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 1 Mar 2017 at 13:04 Ofir Sharony 
> > > wrote:
> > >
> > >> Is there any way to sort grouped records before sending them to the
> > >> reducer?
> > >>
> > >> *Ofir Sharony*
> > >> BackEnd Tech Lead
> > >>
> > >> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277> |
> > ofir.shar...@myheritage.com
> > >> | www.myheritage.com
> > >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >>
> > >> 
> > >>
> > >> 
> > >>   >
> > >> 
> > >>
> > >>
> > >> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy 
> > wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> The TimestampExtractor won't effect the order the records arrive in.
> It
> > >>> just provides a way for developers to use a timestamp other than the
> > >>> default.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <
> ofir.shar...@myheritage.com>
> > >>> wrote:
> > >>>
> >  Hi,
> > 
> >  I have the following code on a stream:
> > 
> >  .selectKey(...)
> >  .groupByKey(...)
> >  .reduce(...)
> > 
> > 

Strange behaviour in Session Windows

2017-03-06 Thread Marco Abitabile
Hello,

I'm playing around with the brand new SessionWindows. I have a simple
topology such as:

KStream sess =
 builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
sess
.map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
.groupByKey(stringSerde, jsonSerde)
.aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS)
.until(WINDOW_MAINTAIN_DURATION_MS),
.filter(MySession::filterOutZeroLenghtSessions)
.to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

these are the most important configuration I'm using, all the other configs
are the classical serdes and hosts props:

private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
2_MINUTES;

private static final Properties props = new Properties();
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
ONE_DAY);

The source stream has data arriving at around 100 messages/second

I'm experiencing this behaviours:

1) MySession::new is called thousands of times, way way more of the number
of messages ingested (around 100 / 1000 times more) the most of this
sessions never reach the end of the pipeline (even if I remove
.filter(MySession::filterOutZeroLenghtSessions) ) and nor
MySession::aggregateSessions
and MySession::mergeSessions are invoked.

Is this correct? I don't understand, maybe I've setup something wrong...

2) I can see that the stream pipeline can ingest the first 15 minutes of
data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
   - every second that passes the pipeline gets slower and slower and
   - I can see new updates to old sessions also after
.until(WINDOW_MAINTAIN_DURATION_MS)
period.
   - the stream consumer starts to ingest new data with slower and slower
rates as time passes, eventually reaching almost 0msg/sec

I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
sessions and those that have been fired, will just be removed from session
store and never touched again.


At the beginning I was thinking that my pipeline was not setup correctly,
however I've tried to follow slavishly the docs and I could not find where
things can go wrong.

Do you have some hints about this?
Please let me know if you need more info about.

thanks a lot,
Marco


kafka authenticate

2017-03-06 Thread 陈江枫
Hi, all
I'm trying to modify kafka authentication using our own authenticating
procedure, authorization will stick to kafka's acls .
Does every entry which fetches data from certain topic need to go through
authentication? ( Including KafkaStreams, replica to leader ,etc.)


Re: Kafka streams DSL advantage

2017-03-06 Thread Michael Noll
The DSL has some unique features that aren't in the Processor API, such as:

- KStream and KTable abstractions.
- Support for time windows (tumbling windows, hopping windows) and session
windows.  The Processor API only has stream-time based `punctuate()`.
- Record caching, which is slightly better than state store caching for the
Processor API (
http://docs.confluent.io/current/streams/developer-guide.html#memory-management
)

You can re-implement most of these features in the processor API too (after
all, the DSL itself is based on the Processor API), but that's manual DIY
work.

That being said, you can also combine the DSL and the Processor API via the
DSL's `process()` and `transform` / `transformValues` operations.  See [1]
for an example.

Best,
Michael



[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java





On Sun, Mar 5, 2017 at 8:01 PM, Shimi Kiviti  wrote:

> Hi Eveyone,
>
> I have seen a few times (here in the mailing list )were someone wanted to
> use kafka streams DSL for something that wasn't possible to do with the DSL
> and the suggestion was to use the processor API.
>
> I was wondering, except the fluent functional code of the DSL, are there
> any other benefits of using the DSL over the processor API?
>
> thanks,
> Shimi
>


Re: Writing data from kafka-streams to remote database

2017-03-06 Thread Michael Noll
I'd use option 2 (Kafka Connect).

Advantages of #2:

- The code is decoupled from the processing code and easier to refactor in
the future. (same as #4)
- The runtime/uptime/scalability of your Kafka Streams app (processing) is
decoupled from the runtime/uptime/scalability of the data ingestion into
your remote database.

"Remove the need for additional kafka topic." isn't a big win typically --
even though topics aren't free, there still quite cheap. ;-)

YMMV of course. :-)


On Sun, Mar 5, 2017 at 7:55 PM, Shimi Kiviti  wrote:

> Thank Eno,
>
> Yes, I am aware of that. It indeed looks like a very useful feature.
>
> The result of the processing in kafka streams is only a small amount of
> data that is require by our service.
> Currently it make more sense for us to update the remote database were we
> have more data that our application require.
> Also, the data should be available in case of failures. The remote database
> data is replicated. AFAIK although RocksDb changelog is backed by kafka, if
> a node fail, the data will be unavailable until it will be replicated to a
> different node.
>
> On Sun, Mar 5, 2017 at 4:38 PM, Eno Thereska 
> wrote:
>
> > Hi Shimi,
> >
> > Could you tell us more about your scenario? Kafka Streams uses embedded
> > databases (RocksDb) to store it's state, so often you don't need to write
> > anything to an external database and you can query your streams state
> > directly from streams. Have a look at this blog if that matches your
> > scenario: https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/  > blog/unifying-stream-processing-and-interactive-
> queries-in-apache-kafka/>.
> >
> > Cheers
> > Eno
> >
> > > On 5 Mar 2017, at 10:48, Shimi Kiviti  wrote:
> > >
> > > Hi Everyone,
> > >
> > > I was wondering about writing data to remote database.
> > > I see 4 possible options:
> > >
> > >   1. Read from a topic and write to the database.
> > >   2. Use kafka connect
> > >   3. Write from anywhere in kafka streams.
> > >   4. Register a CachedStateStore FlushListener that will send a batch
> of
> > >   records when the store flush the records.
> > >
> > > Advantages of #4:
> > >
> > >   - The code is decoupled from the processing code and easier to
> refactor
> > >   in the future.
> > >   - Remove the need for additional kafka topic.
> > >
> > >
> > > Thanks,
> > >
> > > Shimi
> >
> >
>