Byzantine Fault Tolerance Implementation

2019-08-26 Thread Nayak, Soumya R.
Thanks Boyang.

Will check that link and update there.

Regards,
Soumya

-Original Message-
From: Boyang Chen  
Sent: Tuesday, August 27, 2019 11:50 AM
To: users@kafka.apache.org
Subject: Re: Byzantine Fault Tolerance Implementation

Hey Nayak,

there is an on-going KIP in the community about deprecating zookeeper:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
It should be a good place to raise your question about making consensus 
algorithm pluggable in the future.

Boyang

On Mon, Aug 26, 2019 at 10:41 PM Nayak, Soumya R. 
wrote:

> Hi Jorn,
>
> I was talking with the context of Hyperledger Fabric Blockchain where 
> the cluster of kafka zookeeper is used where there might be multiple 
> orgs taking part in the network and transactions where a single system 
> getting failed or a malicious node might disrupt the whole network 
> which would cost a lot.
> So on that aspect I was asking if its possible to have a pluggable 
> algorithm for zookeeper ?
>
> Regards,
> Soumya
> -Original Message-
> From: Jörn Franke 
> Sent: Tuesday, August 27, 2019 11:03 AM
> To: users@kafka.apache.org
> Subject: Re: Byzantine Fault Tolerance Implementation
>
> What kind of stability problems do you have? It is surprising to me 
> that you have them and it is unlikely that you have them due to a 
> specific consensus algorithm. If you have stability issues then I 
> would look at your architecture for weak spots.
>
> Btw. Paxos is a consensus mechanism. Bft just describes a specific 
> type of failures in distributed systems, so implement BFT does 
> probably not make sense.
>
> > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. :
> >
> > Hi Team,
> >
> > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant.
> > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is 
> > there
> any plan in future or current in progress where zookeeper will be 
> implemented with a BFT algorithm. This might help to have a more 
> stable distributed environment when we have the cluster across different 
> machines.
> >
> > Regards,
> > Soumya
> >
> > 
> > **
> >  This message may contain confidential or 
> > proprietary information intended only for the use of the
> > addressee(s) named above or may contain information that is legally 
> > privileged. If you are not the intended addressee, or the person 
> > responsible for delivering it to the intended addressee, you are 
> > hereby notified that reading, disseminating, distributing or copying 
> > this message is strictly prohibited. If you have received this 
> > message
> by mistake, please immediately notify us by replying to the message 
> and delete the original message and any copies immediately thereafter.
> >
> > If you received this email as a commercial message and would like to 
> > opt out of future commercial messages, please let us know and we 
> > will
> remove you from our distribution list.
> >
> > Thank you.~
> > 
> > **
> > 
> > FAFLD
>


Re: Byzantine Fault Tolerance Implementation

2019-08-26 Thread Boyang Chen
Hey Nayak,

there is an on-going KIP in the community about deprecating zookeeper:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
It should be a good place to raise your question about making consensus
algorithm pluggable in the future.

Boyang

On Mon, Aug 26, 2019 at 10:41 PM Nayak, Soumya R. 
wrote:

> Hi Jorn,
>
> I was talking with the context of Hyperledger Fabric Blockchain where the
> cluster of kafka zookeeper is used where there might be multiple orgs
> taking part in the network and transactions where a single system getting
> failed or a malicious node might disrupt the whole network which would cost
> a lot.
> So on that aspect I was asking if its possible to have a pluggable
> algorithm for zookeeper ?
>
> Regards,
> Soumya
> -Original Message-
> From: Jörn Franke 
> Sent: Tuesday, August 27, 2019 11:03 AM
> To: users@kafka.apache.org
> Subject: Re: Byzantine Fault Tolerance Implementation
>
> What kind of stability problems do you have? It is surprising to me that
> you have them and it is unlikely that you have them due to a specific
> consensus algorithm. If you have stability issues then I would look at your
> architecture for weak spots.
>
> Btw. Paxos is a consensus mechanism. Bft just describes a specific type of
> failures in distributed systems, so implement BFT does probably not make
> sense.
>
> > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. :
> >
> > Hi Team,
> >
> > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant.
> > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there
> any plan in future or current in progress where zookeeper will be
> implemented with a BFT algorithm. This might help to have a more stable
> distributed environment when we have the cluster across different machines.
> >
> > Regards,
> > Soumya
> >
> > **
> >  This message may contain confidential or
> > proprietary information intended only for the use of the
> > addressee(s) named above or may contain information that is legally
> > privileged. If you are not the intended addressee, or the person
> > responsible for delivering it to the intended addressee, you are
> > hereby notified that reading, disseminating, distributing or copying
> > this message is strictly prohibited. If you have received this message
> by mistake, please immediately notify us by replying to the message and
> delete the original message and any copies immediately thereafter.
> >
> > If you received this email as a commercial message and would like to
> > opt out of future commercial messages, please let us know and we will
> remove you from our distribution list.
> >
> > Thank you.~
> > **
> > 
> > FAFLD
>


Byzantine Fault Tolerance Implementation

2019-08-26 Thread Nayak, Soumya R.
Hi Jorn,

I was talking with the context of Hyperledger Fabric Blockchain where the 
cluster of kafka zookeeper is used where there might be multiple orgs taking 
part in the network and transactions where a single system getting failed or a 
malicious node might disrupt the whole network which would cost a lot.
So on that aspect I was asking if its possible to have a pluggable algorithm 
for zookeeper ?

Regards,
Soumya
-Original Message-
From: Jörn Franke  
Sent: Tuesday, August 27, 2019 11:03 AM
To: users@kafka.apache.org
Subject: Re: Byzantine Fault Tolerance Implementation

What kind of stability problems do you have? It is surprising to me that you 
have them and it is unlikely that you have them due to a specific consensus 
algorithm. If you have stability issues then I would look at your architecture 
for weak spots.

Btw. Paxos is a consensus mechanism. Bft just describes a specific type of 
failures in distributed systems, so implement BFT does probably not make sense.

> Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. :
> 
> Hi Team,
> 
> Currently Zookeeper and Kafka cluster are Crash Fault Tolerant.
> Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any 
> plan in future or current in progress where zookeeper will be implemented 
> with a BFT algorithm. This might help to have a more stable distributed 
> environment when we have the cluster across different machines.
> 
> Regards,
> Soumya
> 
> **
>  This message may contain confidential or 
> proprietary information intended only for the use of the
> addressee(s) named above or may contain information that is legally 
> privileged. If you are not the intended addressee, or the person 
> responsible for delivering it to the intended addressee, you are 
> hereby notified that reading, disseminating, distributing or copying 
> this message is strictly prohibited. If you have received this message by 
> mistake, please immediately notify us by replying to the message and delete 
> the original message and any copies immediately thereafter.
> 
> If you received this email as a commercial message and would like to 
> opt out of future commercial messages, please let us know and we will remove 
> you from our distribution list.
> 
> Thank you.~
> **
> 
> FAFLD


Re: Byzantine Fault Tolerance Implementation

2019-08-26 Thread Jörn Franke
What kind of stability problems do you have? It is surprising to me that you 
have them and it is unlikely that you have them due to a specific consensus 
algorithm. If you have stability issues then I would look at your architecture 
for weak spots.

Btw. Paxos is a consensus mechanism. Bft just describes a specific type of 
failures in distributed systems, so implement BFT does probably not make sense.

> Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. :
> 
> Hi Team,
> 
> Currently Zookeeper and Kafka cluster are Crash Fault Tolerant.
> Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any 
> plan in future or current in progress where zookeeper will be implemented 
> with a BFT algorithm. This might help to have a more stable distributed 
> environment when we have the cluster across different machines.
> 
> Regards,
> Soumya
> 
> **
> This message may contain confidential or proprietary information intended 
> only for the use of the
> addressee(s) named above or may contain information that is legally 
> privileged. If you are
> not the intended addressee, or the person responsible for delivering it to 
> the intended addressee,
> you are hereby notified that reading, disseminating, distributing or copying 
> this message is strictly
> prohibited. If you have received this message by mistake, please immediately 
> notify us by
> replying to the message and delete the original message and any copies 
> immediately thereafter.
> 
> If you received this email as a commercial message and would like to opt out 
> of future commercial
> messages, please let us know and we will remove you from our distribution 
> list.
> 
> Thank you.~
> **
> FAFLD


kafka client 2.0.1 compatibility with kafka broker 0.10.0.1

2019-08-26 Thread Upendra Yadav
Hi,

We have kafka brokers with kafka version 0.10.0.1.
And brooklin(https://github.com/linkedin/brooklin) is using kafka client -
2.0.1.
I'm testing brooklin for kafka mirroring,

Due to kafka version mismatch, Do you see any issue while using brooklin
kafka-mirror with our kafka brokers?

Or in general: Do you see any issue while using kafka client 2.0.1
with kafka broker 0.10.0.1?

I went through below link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version
https://cwiki.apache.org/confluence/display/KAFKA/KIP-97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy

but I'm not sure : is brooklin using such any new APIs that is not
supported in kafka broker 0.10.0.1.

If someone have time then please look into this and let me know.
Thank you so much...


Byzantine Fault Tolerance Implementation

2019-08-26 Thread Nayak, Soumya R.
Hi Team,

Currently Zookeeper and Kafka cluster are Crash Fault Tolerant.
Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any 
plan in future or current in progress where zookeeper will be implemented with 
a BFT algorithm. This might help to have a more stable distributed environment 
when we have the cluster across different machines.

Regards,
Soumya

**
This message may contain confidential or proprietary information intended only 
for the use of the
addressee(s) named above or may contain information that is legally privileged. 
If you are
not the intended addressee, or the person responsible for delivering it to the 
intended addressee,
you are hereby notified that reading, disseminating, distributing or copying 
this message is strictly
prohibited. If you have received this message by mistake, please immediately 
notify us by
replying to the message and delete the original message and any copies 
immediately thereafter.

If you received this email as a commercial message and would like to opt out of 
future commercial
messages, please let us know and we will remove you from our distribution list.

Thank you.~
**
FAFLD


Re: Kafka Streams and broker compatibility

2019-08-26 Thread Alisson Sales
Awesome, thanks for clarifying :)

On Tue, Aug 27, 2019 at 1:08 PM Guozhang Wang  wrote:

> Right, the fix itself actually add more headers even if there were none
> from the source topics, and hence cause old versioned brokers to fail. But
> theoretically speaking, as long as the streams clients are version 0.11.0+
> the broker version should be 0.11.0+ for various features that may require
> higher message format (eos, suppression, etc).
>
> On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman 
> wrote:
>
> > I'm pretty sure one of the Suppress bug fixes that went into 2.2.1
> involved
> > adding headers. Updating the compatibility matrix must have just slipped
> > when that bugfix was merged -- thanks for bringing this up!
> >
> > On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales 
> > wrote:
> >
> > > Hi Guozhang, thanks for your reply.
> > >
> > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> > > upgrading to this version mostly because we were facing problems with
> > > KTable suppress.
> > >
> > > I was experiencing this exact same problem:
> > >
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> > > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
> > >
> > > When trying to confirm the fix worked for my topology/app I encountered
> > the
> > > issue: java.lang.IllegalArgumentException: Magic v1 does not support
> > > record.
> > >
> > > In summary the topology works fine on 0.10.2.1 with kafka-streams
> 2.2.0,
> > > but fails with the error above if I use 2.2.1.
> > >
> > > I haven't changed any part of the code, simply updated my gradle file
> > > updating the dependency.
> > >
> > > Thanks again
> > >
> > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Alisson,
> > > >
> > > > The root cause you've seen is the message header support, which is
> > added
> > > in
> > > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0
> > (KIP-244).
> > > If
> > > > your code does not add any more headers then it would only inherit
> the
> > > > headers from source topics when trying to write to intermediate /
> sink
> > > > topics. So I think that even if you were using 2.2.0 you'd still hit
> > this
> > > > issue if you happen to have headers in some of your source topic
> > > messages.
> > > >
> > > > I've updated
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > per
> > > > the updates.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <
> alisson.sa...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi all, I've just upgraded a project that was using kafka-streams
> > 2.2.0
> > > > to
> > > > > 2.2.1 and found the following error at the end.
> > > > >
> > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> > > 0.11
> > > > > the error doesn't happen anymore.
> > > > >
> > > > > My question here is: where is the best place we can find the
> required
> > > > > minimum broker version for the kafka-streams version one is using?
> > > > >
> > > > > This is not clear to me and the
> > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > > > wiki
> > > > > page seems outdated.
> > > > >
> > > > > Thanks in advance
> > > > >
> > > > > Exception in thread
> > > > >
> > >
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > > > Failed
> > > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > > > at
> > > > >
> > >

Re: Kafka Streams and broker compatibility

2019-08-26 Thread Guozhang Wang
Right, the fix itself actually add more headers even if there were none
from the source topics, and hence cause old versioned brokers to fail. But
theoretically speaking, as long as the streams clients are version 0.11.0+
the broker version should be 0.11.0+ for various features that may require
higher message format (eos, suppression, etc).

On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman 
wrote:

> I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
> adding headers. Updating the compatibility matrix must have just slipped
> when that bugfix was merged -- thanks for bringing this up!
>
> On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales 
> wrote:
>
> > Hi Guozhang, thanks for your reply.
> >
> > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> > upgrading to this version mostly because we were facing problems with
> > KTable suppress.
> >
> > I was experiencing this exact same problem:
> >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
> >
> > When trying to confirm the fix worked for my topology/app I encountered
> the
> > issue: java.lang.IllegalArgumentException: Magic v1 does not support
> > record.
> >
> > In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> > but fails with the error above if I use 2.2.1.
> >
> > I haven't changed any part of the code, simply updated my gradle file
> > updating the dependency.
> >
> > Thanks again
> >
> > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang 
> wrote:
> >
> > > Hello Alisson,
> > >
> > > The root cause you've seen is the message header support, which is
> added
> > in
> > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0
> (KIP-244).
> > If
> > > your code does not add any more headers then it would only inherit the
> > > headers from source topics when trying to write to intermediate / sink
> > > topics. So I think that even if you were using 2.2.0 you'd still hit
> this
> > > issue if you happen to have headers in some of your source topic
> > messages.
> > >
> > > I've updated
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > per
> > > the updates.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales  >
> > > wrote:
> > >
> > > > Hi all, I've just upgraded a project that was using kafka-streams
> 2.2.0
> > > to
> > > > 2.2.1 and found the following error at the end.
> > > >
> > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> > 0.11
> > > > the error doesn't happen anymore.
> > > >
> > > > My question here is: where is the best place we can find the required
> > > > minimum broker version for the kafka-streams version one is using?
> > > >
> > > > This is not clear to me and the
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > > wiki
> > > > page seems outdated.
> > > >
> > > > Thanks in advance
> > > >
> > > > Exception in thread
> > > >
> > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > > Failed
> > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [1_2]
> > > > Abort sending since an error caught with a previous record (key
> > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
>

Re: Kafka Streams and broker compatibility

2019-08-26 Thread Sophie Blee-Goldman
I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
adding headers. Updating the compatibility matrix must have just slipped
when that bugfix was merged -- thanks for bringing this up!

On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales 
wrote:

> Hi Guozhang, thanks for your reply.
>
> I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> upgrading to this version mostly because we were facing problems with
> KTable suppress.
>
> I was experiencing this exact same problem:
>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
>
> When trying to confirm the fix worked for my topology/app I encountered the
> issue: java.lang.IllegalArgumentException: Magic v1 does not support
> record.
>
> In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> but fails with the error above if I use 2.2.1.
>
> I haven't changed any part of the code, simply updated my gradle file
> updating the dependency.
>
> Thanks again
>
> On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang  wrote:
>
> > Hello Alisson,
> >
> > The root cause you've seen is the message header support, which is added
> in
> > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244).
> If
> > your code does not add any more headers then it would only inherit the
> > headers from source topics when trying to write to intermediate / sink
> > topics. So I think that even if you were using 2.2.0 you'd still hit this
> > issue if you happen to have headers in some of your source topic
> messages.
> >
> > I've updated
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> per
> > the updates.
> >
> >
> > Guozhang
> >
> > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales 
> > wrote:
> >
> > > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0
> > to
> > > 2.2.1 and found the following error at the end.
> > >
> > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> 0.11
> > > the error doesn't happen anymore.
> > >
> > > My question here is: where is the best place we can find the required
> > > minimum broker version for the kafka-streams version one is using?
> > >
> > > This is not clear to me and the
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > wiki
> > > page seems outdated.
> > >
> > > Thanks in advance
> > >
> > > Exception in thread
> > >
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > Failed
> > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> > > Abort sending since an error caught with a previous record (key
> > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > timestamp
> > > null) to topic
> > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog
> due
> > to
> > > java.lang.IllegalArgumentException: Magic v1 does not support record
> > > headers
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.P

Re: Kafka Streams and broker compatibility

2019-08-26 Thread Alisson Sales
Hi Guozhang, thanks for your reply.

I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
upgrading to this version mostly because we were facing problems with
KTable suppress.

I was experiencing this exact same problem:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.

When trying to confirm the fix worked for my topology/app I encountered the
issue: java.lang.IllegalArgumentException: Magic v1 does not support record.

In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
but fails with the error above if I use 2.2.1.

I haven't changed any part of the code, simply updated my gradle file
updating the dependency.

Thanks again

On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang  wrote:

> Hello Alisson,
>
> The root cause you've seen is the message header support, which is added in
> brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If
> your code does not add any more headers then it would only inherit the
> headers from source topics when trying to write to intermediate / sink
> topics. So I think that even if you were using 2.2.0 you'd still hit this
> issue if you happen to have headers in some of your source topic messages.
>
> I've updated
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per
> the updates.
>
>
> Guozhang
>
> On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales 
> wrote:
>
> > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0
> to
> > 2.2.1 and found the following error at the end.
> >
> > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
> > the error doesn't happen anymore.
> >
> > My question here is: where is the best place we can find the required
> > minimum broker version for the kafka-streams version one is using?
> >
> > This is not clear to me and the
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > wiki
> > page seems outdated.
> >
> > Thanks in advance
> >
> > Exception in thread
> > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> Failed
> > to flush state store KTABLE-SUPPRESS-STATE-STORE-09
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> > Abort sending since an error caught with a previous record (key
> > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > timestamp
> > null) to topic
> > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog due
> to
> > java.lang.IllegalArgumentException: Magic v1 does not support record
> > headers
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > at
> >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > at
> >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > ... 10 more
> > Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
> > record headers
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecor

Re: Kafka Streams and broker compatibility

2019-08-26 Thread Guozhang Wang
Hello Alisson,

The root cause you've seen is the message header support, which is added in
brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If
your code does not add any more headers then it would only inherit the
headers from source topics when trying to write to intermediate / sink
topics. So I think that even if you were using 2.2.0 you'd still hit this
issue if you happen to have headers in some of your source topic messages.

I've updated
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per
the updates.


Guozhang

On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales 
wrote:

> Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 to
> 2.2.1 and found the following error at the end.
>
> I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
> the error doesn't happen anymore.
>
> My question here is: where is the best place we can find the required
> minimum broker version for the kafka-streams version one is using?
>
> This is not clear to me and the
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> wiki
> page seems outdated.
>
> Thanks in advance
>
> Exception in thread
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed
> to flush state store KTABLE-SUPPRESS-STATE-STORE-09
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> Abort sending since an error caught with a previous record (key
> A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> timestamp
> null) to topic
> streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog due to
> java.lang.IllegalArgumentException: Magic v1 does not support record
> headers
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> ... 10 more
> Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
> record headers
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> at
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> at
>
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
>


-- 
-- Guozhang


Re: Question: Kafka as a message queue for long running tasks

2019-08-26 Thread Raman Gupta
Gerard, we have a similar use case we are using Kafka for, and are
setting max.poll.interval.ms to a large value in order to handle the
worst-case scenario.

Rebalancing is indeed a big problem with this approach (and not just
for "new" consumers as you mentioned -- adding consumers causes a
stop-the-world rebalance on all existing consumers as well). The
static consumer groups protocol introduced in Kafka 2.3 helps quite a
lot with the failed-and-restarting consumer case (though I ran into
several bugs with it and still have a stream that won't start in dev,
it seems to be working in prod now with patched Kafka brokers), but
does not handle scaling your consumers up (or down). Scaling consumers
is still a stop-the-world rebalance, and you can expect it to take at
least the max time any particular in-flight message takes to process
(if all goes well), because your consumers will essentially be sitting
around idle waiting for every other consumer to finish. Theoretically,
the incremental rebalancing capabilities coming out soon (KAFKA-8179)
should help resolve this.

The other issue you will run into with long processing is "hot
partitions/consumers" -- with a large backlog of such messages it is
inevitable that the consumers of some partitions will take a lot
longer to process than some other partitions, which means *some* of
your consumers will just be sitting idle, while others are lagging
*way* behind. There is no easy solution to this that I have found
because one partition is always processed by at most one Kafka
consumer. You'll have to work around this by reading a bunch of
messages, distributing their work yourself, tracking completions
outside of Kafka, and then committing offsets back to Kafka manually.

Honestly, because of these issues, right now I am seriously
considering a migration of our Kafka workloads to Apache Pulsar, which
supports the same semantics as Kafka, but also supports
work/message-queue style messaging *much* better, as it uses
per-message acknowledgement, and subscriptions are not limited to the
number of partitions i.e. you can have N partitions, and M+N
consumers.

Regards,
Raman


On Thu, Jun 13, 2019 at 1:54 PM Mark Anderson  wrote:
>
> We have a different use case where we stop consuming due to connection to
> an external system being down.
>
> In this case we sleep for the same period as our poll timeout would be and
> recommit the previous offset. This stops the consumer going stale and
> avoids increasing the max interval.
>
> Perhaps you could do something similar?
>
> Mark
>
>
> On Thu, 13 Jun 2019, 17:49 Murphy, Gerard,  wrote:
>
> > Hi,
> >
> > I am wondering if there is something I am missing about my set up to
> > facilitate long running jobs.
> >
> > For my purposes it is ok to have `At most once` message delivery, this
> > means it is not required to think about committing offsets (or at least it
> > is ok to commit each message offset upon receiving it).
> >
> > I have the following in order to achieve the competing consumer pattern:
> >
> >   *   A topic
> >   *   X consumers in the same group
> >   *   P partitions in a topic (where P >= X always)
> >
> > My problem is that I have messages that can take ~15 minutes (but this may
> > fluctuate by up to 50% lets say) in order to process. In order to avoid
> > consumers having their partition assignments revoked I have increased the
> > value of `max.poll.interval.ms` to reflect this.
> > However this comes with some negative consequences:
> >
> >   *   if some message exceeds this length of time then in a worst case
> > scenario a the consumer processing this message will have to wait up to the
> > value of `max.poll.interval.ms` for a rebalance
> >   *   if I need to scale and increase the number of consumers based on
> > load then any new consumers might also have to wait the value of `
> > max.poll.interval.ms` for a rebalance to occur in order to process any
> > messages
> >
> > As it stands at the moment I see that I can proceed as follows:
> >
> >   *   Set `max.poll.interval.ms` to be a small value and accept that
> > every consumer processing every message will time out and go through the
> > process of having assignments revoked and waiting a small amount of time
> > for a rebalance
> >
> > However I do not like this, and am considering looking at alternative
> > technology for my message queue as I do not see any obvious way around this.
> > Admittedly I am new to Kafka, and it is just a gut feeling that the above
> > is not desirable.
> > I have used RabbitMQ in the past for these scenarios, however we need
> > Kafka in our architecture for other purposes at the moment and it would be
> > nice not to have to introduce another technology if Kafka can achieve this.
> >
> > I appreciate any advise that anybody can offer on this subject.
> >
> > Regards,
> > Ger
> >


Re: KafkaStreams - impact of retention on repartition topics

2019-08-26 Thread Murilo Tavares
Cool! Thank you Matthias!


On Sun, 25 Aug 2019 at 15:11, Matthias J. Sax  wrote:

> You cannot delete arbitrary data, however, it's possible to send a
> "truncate request" to brokers, to delete data before the retention time
> is reached:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient
>
> There is `AdminClient#deleteRecords(...)` API to do so.
>
>
> -Matthias
>
> On 8/21/19 9:09 PM, Murilo Tavares wrote:
> > Thanks Matthias for the prompt response.
> > Now just for curiosity, how does that work? I thought it was not possible
> > to easily delete topic data...
> >
> >
> > On Wed, Aug 21, 2019 at 4:51 PM Matthias J. Sax 
> > wrote:
> >
> >> No need to worry about this.
> >>
> >> Kafka Streams used "purge data" calls, to actively delete data from
> >> those topics after the records are processed. Hence, those topics won't
> >> grow unbounded but are "truncated" on a regular basis.
> >>
> >>
> >> -Matthias
> >>
> >> On 8/21/19 11:38 AM, Murilo Tavares wrote:
> >>> Hi
> >>> I have a complex KafkaStreams topology, where I have a bunch of KTables
> >>> that I regroup (rekeying) and aggregate so I can join them.
> >>> I've noticed that the "-repartition" topics created by the groupBy
> >>> operations have a very long retention by default (Long.MAX_VALUE).
> >>> I'm a bit concerned about the size of these topics, as they will retain
> >>> data forever. I wonder why are they so long, and what would be the
> impact
> >>> of reducing this retention?
> >>> Thanks
> >>> Murilo
> >>>
> >>
> >>
> >
>
>