Byzantine Fault Tolerance Implementation
Thanks Boyang. Will check that link and update there. Regards, Soumya -Original Message- From: Boyang Chen Sent: Tuesday, August 27, 2019 11:50 AM To: users@kafka.apache.org Subject: Re: Byzantine Fault Tolerance Implementation Hey Nayak, there is an on-going KIP in the community about deprecating zookeeper: https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum It should be a good place to raise your question about making consensus algorithm pluggable in the future. Boyang On Mon, Aug 26, 2019 at 10:41 PM Nayak, Soumya R. wrote: > Hi Jorn, > > I was talking with the context of Hyperledger Fabric Blockchain where > the cluster of kafka zookeeper is used where there might be multiple > orgs taking part in the network and transactions where a single system > getting failed or a malicious node might disrupt the whole network > which would cost a lot. > So on that aspect I was asking if its possible to have a pluggable > algorithm for zookeeper ? > > Regards, > Soumya > -Original Message- > From: Jörn Franke > Sent: Tuesday, August 27, 2019 11:03 AM > To: users@kafka.apache.org > Subject: Re: Byzantine Fault Tolerance Implementation > > What kind of stability problems do you have? It is surprising to me > that you have them and it is unlikely that you have them due to a > specific consensus algorithm. If you have stability issues then I > would look at your architecture for weak spots. > > Btw. Paxos is a consensus mechanism. Bft just describes a specific > type of failures in distributed systems, so implement BFT does > probably not make sense. > > > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. : > > > > Hi Team, > > > > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant. > > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is > > there > any plan in future or current in progress where zookeeper will be > implemented with a BFT algorithm. This might help to have a more > stable distributed environment when we have the cluster across different > machines. > > > > Regards, > > Soumya > > > > > > ** > > This message may contain confidential or > > proprietary information intended only for the use of the > > addressee(s) named above or may contain information that is legally > > privileged. If you are not the intended addressee, or the person > > responsible for delivering it to the intended addressee, you are > > hereby notified that reading, disseminating, distributing or copying > > this message is strictly prohibited. If you have received this > > message > by mistake, please immediately notify us by replying to the message > and delete the original message and any copies immediately thereafter. > > > > If you received this email as a commercial message and would like to > > opt out of future commercial messages, please let us know and we > > will > remove you from our distribution list. > > > > Thank you.~ > > > > ** > > > > FAFLD >
Re: Byzantine Fault Tolerance Implementation
Hey Nayak, there is an on-going KIP in the community about deprecating zookeeper: https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum It should be a good place to raise your question about making consensus algorithm pluggable in the future. Boyang On Mon, Aug 26, 2019 at 10:41 PM Nayak, Soumya R. wrote: > Hi Jorn, > > I was talking with the context of Hyperledger Fabric Blockchain where the > cluster of kafka zookeeper is used where there might be multiple orgs > taking part in the network and transactions where a single system getting > failed or a malicious node might disrupt the whole network which would cost > a lot. > So on that aspect I was asking if its possible to have a pluggable > algorithm for zookeeper ? > > Regards, > Soumya > -Original Message- > From: Jörn Franke > Sent: Tuesday, August 27, 2019 11:03 AM > To: users@kafka.apache.org > Subject: Re: Byzantine Fault Tolerance Implementation > > What kind of stability problems do you have? It is surprising to me that > you have them and it is unlikely that you have them due to a specific > consensus algorithm. If you have stability issues then I would look at your > architecture for weak spots. > > Btw. Paxos is a consensus mechanism. Bft just describes a specific type of > failures in distributed systems, so implement BFT does probably not make > sense. > > > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. : > > > > Hi Team, > > > > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant. > > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there > any plan in future or current in progress where zookeeper will be > implemented with a BFT algorithm. This might help to have a more stable > distributed environment when we have the cluster across different machines. > > > > Regards, > > Soumya > > > > ** > > This message may contain confidential or > > proprietary information intended only for the use of the > > addressee(s) named above or may contain information that is legally > > privileged. If you are not the intended addressee, or the person > > responsible for delivering it to the intended addressee, you are > > hereby notified that reading, disseminating, distributing or copying > > this message is strictly prohibited. If you have received this message > by mistake, please immediately notify us by replying to the message and > delete the original message and any copies immediately thereafter. > > > > If you received this email as a commercial message and would like to > > opt out of future commercial messages, please let us know and we will > remove you from our distribution list. > > > > Thank you.~ > > ** > > > > FAFLD >
Byzantine Fault Tolerance Implementation
Hi Jorn, I was talking with the context of Hyperledger Fabric Blockchain where the cluster of kafka zookeeper is used where there might be multiple orgs taking part in the network and transactions where a single system getting failed or a malicious node might disrupt the whole network which would cost a lot. So on that aspect I was asking if its possible to have a pluggable algorithm for zookeeper ? Regards, Soumya -Original Message- From: Jörn Franke Sent: Tuesday, August 27, 2019 11:03 AM To: users@kafka.apache.org Subject: Re: Byzantine Fault Tolerance Implementation What kind of stability problems do you have? It is surprising to me that you have them and it is unlikely that you have them due to a specific consensus algorithm. If you have stability issues then I would look at your architecture for weak spots. Btw. Paxos is a consensus mechanism. Bft just describes a specific type of failures in distributed systems, so implement BFT does probably not make sense. > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. : > > Hi Team, > > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant. > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any > plan in future or current in progress where zookeeper will be implemented > with a BFT algorithm. This might help to have a more stable distributed > environment when we have the cluster across different machines. > > Regards, > Soumya > > ** > This message may contain confidential or > proprietary information intended only for the use of the > addressee(s) named above or may contain information that is legally > privileged. If you are not the intended addressee, or the person > responsible for delivering it to the intended addressee, you are > hereby notified that reading, disseminating, distributing or copying > this message is strictly prohibited. If you have received this message by > mistake, please immediately notify us by replying to the message and delete > the original message and any copies immediately thereafter. > > If you received this email as a commercial message and would like to > opt out of future commercial messages, please let us know and we will remove > you from our distribution list. > > Thank you.~ > ** > > FAFLD
Re: Byzantine Fault Tolerance Implementation
What kind of stability problems do you have? It is surprising to me that you have them and it is unlikely that you have them due to a specific consensus algorithm. If you have stability issues then I would look at your architecture for weak spots. Btw. Paxos is a consensus mechanism. Bft just describes a specific type of failures in distributed systems, so implement BFT does probably not make sense. > Am 27.08.2019 um 06:36 schrieb Nayak, Soumya R. : > > Hi Team, > > Currently Zookeeper and Kafka cluster are Crash Fault Tolerant. > Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any > plan in future or current in progress where zookeeper will be implemented > with a BFT algorithm. This might help to have a more stable distributed > environment when we have the cluster across different machines. > > Regards, > Soumya > > ** > This message may contain confidential or proprietary information intended > only for the use of the > addressee(s) named above or may contain information that is legally > privileged. If you are > not the intended addressee, or the person responsible for delivering it to > the intended addressee, > you are hereby notified that reading, disseminating, distributing or copying > this message is strictly > prohibited. If you have received this message by mistake, please immediately > notify us by > replying to the message and delete the original message and any copies > immediately thereafter. > > If you received this email as a commercial message and would like to opt out > of future commercial > messages, please let us know and we will remove you from our distribution > list. > > Thank you.~ > ** > FAFLD
kafka client 2.0.1 compatibility with kafka broker 0.10.0.1
Hi, We have kafka brokers with kafka version 0.10.0.1. And brooklin(https://github.com/linkedin/brooklin) is using kafka client - 2.0.1. I'm testing brooklin for kafka mirroring, Due to kafka version mismatch, Do you see any issue while using brooklin kafka-mirror with our kafka brokers? Or in general: Do you see any issue while using kafka client 2.0.1 with kafka broker 0.10.0.1? I went through below link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version https://cwiki.apache.org/confluence/display/KAFKA/KIP-97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy but I'm not sure : is brooklin using such any new APIs that is not supported in kafka broker 0.10.0.1. If someone have time then please look into this and let me know. Thank you so much...
Byzantine Fault Tolerance Implementation
Hi Team, Currently Zookeeper and Kafka cluster are Crash Fault Tolerant. Zookeeper uses a version of Paxos - Zookeeper atomic broadcast. Is there any plan in future or current in progress where zookeeper will be implemented with a BFT algorithm. This might help to have a more stable distributed environment when we have the cluster across different machines. Regards, Soumya ** This message may contain confidential or proprietary information intended only for the use of the addressee(s) named above or may contain information that is legally privileged. If you are not the intended addressee, or the person responsible for delivering it to the intended addressee, you are hereby notified that reading, disseminating, distributing or copying this message is strictly prohibited. If you have received this message by mistake, please immediately notify us by replying to the message and delete the original message and any copies immediately thereafter. If you received this email as a commercial message and would like to opt out of future commercial messages, please let us know and we will remove you from our distribution list. Thank you.~ ** FAFLD
Re: Kafka Streams and broker compatibility
Awesome, thanks for clarifying :) On Tue, Aug 27, 2019 at 1:08 PM Guozhang Wang wrote: > Right, the fix itself actually add more headers even if there were none > from the source topics, and hence cause old versioned brokers to fail. But > theoretically speaking, as long as the streams clients are version 0.11.0+ > the broker version should be 0.11.0+ for various features that may require > higher message format (eos, suppression, etc). > > On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman > wrote: > > > I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 > involved > > adding headers. Updating the compatibility matrix must have just slipped > > when that bugfix was merged -- thanks for bringing this up! > > > > On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales > > wrote: > > > > > Hi Guozhang, thanks for your reply. > > > > > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm > > > upgrading to this version mostly because we were facing problems with > > > KTable suppress. > > > > > > I was experiencing this exact same problem: > > > > > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156 > > > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895. > > > > > > When trying to confirm the fix worked for my topology/app I encountered > > the > > > issue: java.lang.IllegalArgumentException: Magic v1 does not support > > > record. > > > > > > In summary the topology works fine on 0.10.2.1 with kafka-streams > 2.2.0, > > > but fails with the error above if I use 2.2.1. > > > > > > I haven't changed any part of the code, simply updated my gradle file > > > updating the dependency. > > > > > > Thanks again > > > > > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang > > wrote: > > > > > > > Hello Alisson, > > > > > > > > The root cause you've seen is the message header support, which is > > added > > > in > > > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 > > (KIP-244). > > > If > > > > your code does not add any more headers then it would only inherit > the > > > > headers from source topics when trying to write to intermediate / > sink > > > > topics. So I think that even if you were using 2.2.0 you'd still hit > > this > > > > issue if you happen to have headers in some of your source topic > > > messages. > > > > > > > > I've updated > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > per > > > > the updates. > > > > > > > > > > > > Guozhang > > > > > > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales < > alisson.sa...@gmail.com > > > > > > > wrote: > > > > > > > > > Hi all, I've just upgraded a project that was using kafka-streams > > 2.2.0 > > > > to > > > > > 2.2.1 and found the following error at the end. > > > > > > > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to > > > 0.11 > > > > > the error doesn't happen anymore. > > > > > > > > > > My question here is: where is the best place we can find the > required > > > > > minimum broker version for the kafka-streams version one is using? > > > > > > > > > > This is not clear to me and the > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > > > wiki > > > > > page seems outdated. > > > > > > > > > > Thanks in advance > > > > > > > > > > Exception in thread > > > > > > > > > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] > > > > Failed > > > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09 > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > > > > > at > > > > > > > >
Re: Kafka Streams and broker compatibility
Right, the fix itself actually add more headers even if there were none from the source topics, and hence cause old versioned brokers to fail. But theoretically speaking, as long as the streams clients are version 0.11.0+ the broker version should be 0.11.0+ for various features that may require higher message format (eos, suppression, etc). On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman wrote: > I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved > adding headers. Updating the compatibility matrix must have just slipped > when that bugfix was merged -- thanks for bringing this up! > > On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales > wrote: > > > Hi Guozhang, thanks for your reply. > > > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm > > upgrading to this version mostly because we were facing problems with > > KTable suppress. > > > > I was experiencing this exact same problem: > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156 > > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895. > > > > When trying to confirm the fix worked for my topology/app I encountered > the > > issue: java.lang.IllegalArgumentException: Magic v1 does not support > > record. > > > > In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0, > > but fails with the error above if I use 2.2.1. > > > > I haven't changed any part of the code, simply updated my gradle file > > updating the dependency. > > > > Thanks again > > > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang > wrote: > > > > > Hello Alisson, > > > > > > The root cause you've seen is the message header support, which is > added > > in > > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 > (KIP-244). > > If > > > your code does not add any more headers then it would only inherit the > > > headers from source topics when trying to write to intermediate / sink > > > topics. So I think that even if you were using 2.2.0 you'd still hit > this > > > issue if you happen to have headers in some of your source topic > > messages. > > > > > > I've updated > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > per > > > the updates. > > > > > > > > > Guozhang > > > > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales > > > > wrote: > > > > > > > Hi all, I've just upgraded a project that was using kafka-streams > 2.2.0 > > > to > > > > 2.2.1 and found the following error at the end. > > > > > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to > > 0.11 > > > > the error doesn't happen anymore. > > > > > > > > My question here is: where is the best place we can find the required > > > > minimum broker version for the kafka-streams version one is using? > > > > > > > > This is not clear to me and the > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > > wiki > > > > page seems outdated. > > > > > > > > Thanks in advance > > > > > > > > Exception in thread > > > > > > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] > > > Failed > > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09 > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task > [1_2] > > > > Abort sending since an error caught with a previous record (key > > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db >
Re: Kafka Streams and broker compatibility
I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved adding headers. Updating the compatibility matrix must have just slipped when that bugfix was merged -- thanks for bringing this up! On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales wrote: > Hi Guozhang, thanks for your reply. > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm > upgrading to this version mostly because we were facing problems with > KTable suppress. > > I was experiencing this exact same problem: > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156 > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895. > > When trying to confirm the fix worked for my topology/app I encountered the > issue: java.lang.IllegalArgumentException: Magic v1 does not support > record. > > In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0, > but fails with the error above if I use 2.2.1. > > I haven't changed any part of the code, simply updated my gradle file > updating the dependency. > > Thanks again > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang wrote: > > > Hello Alisson, > > > > The root cause you've seen is the message header support, which is added > in > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). > If > > your code does not add any more headers then it would only inherit the > > headers from source topics when trying to write to intermediate / sink > > topics. So I think that even if you were using 2.2.0 you'd still hit this > > issue if you happen to have headers in some of your source topic > messages. > > > > I've updated > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > per > > the updates. > > > > > > Guozhang > > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales > > wrote: > > > > > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 > > to > > > 2.2.1 and found the following error at the end. > > > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to > 0.11 > > > the error doesn't happen anymore. > > > > > > My question here is: where is the best place we can find the required > > > minimum broker version for the kafka-streams version one is using? > > > > > > This is not clear to me and the > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > wiki > > > page seems outdated. > > > > > > Thanks in advance > > > > > > Exception in thread > > > > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] > > Failed > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09 > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2] > > > Abort sending since an error caught with a previous record (key > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db > > > timestamp > > > null) to topic > > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog > due > > to > > > java.lang.IllegalArgumentException: Magic v1 does not support record > > > headers > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244) > > > at > > > > > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284) > > > at > > > > > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266) > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.P
Re: Kafka Streams and broker compatibility
Hi Guozhang, thanks for your reply. I suspect the "problem" has to do with the fixes released on 2.2.1. I'm upgrading to this version mostly because we were facing problems with KTable suppress. I was experiencing this exact same problem: https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156 This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895. When trying to confirm the fix worked for my topology/app I encountered the issue: java.lang.IllegalArgumentException: Magic v1 does not support record. In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0, but fails with the error above if I use 2.2.1. I haven't changed any part of the code, simply updated my gradle file updating the dependency. Thanks again On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang wrote: > Hello Alisson, > > The root cause you've seen is the message header support, which is added in > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If > your code does not add any more headers then it would only inherit the > headers from source topics when trying to write to intermediate / sink > topics. So I think that even if you were using 2.2.0 you'd still hit this > issue if you happen to have headers in some of your source topic messages. > > I've updated > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per > the updates. > > > Guozhang > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales > wrote: > > > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 > to > > 2.2.1 and found the following error at the end. > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11 > > the error doesn't happen anymore. > > > > My question here is: where is the best place we can find the required > > minimum broker version for the kafka-streams version one is using? > > > > This is not clear to me and the > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > wiki > > page seems outdated. > > > > Thanks in advance > > > > Exception in thread > > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] > Failed > > to flush state store KTABLE-SUPPRESS-STATE-STORE-09 > > at > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > > at > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > > at > > > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > > at > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2] > > Abort sending since an error caught with a previous record (key > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db > > timestamp > > null) to topic > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog due > to > > java.lang.IllegalArgumentException: Magic v1 does not support record > > headers > > at > > > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244) > > at > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284) > > at > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266) > > at > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > > ... 10 more > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not support > > record headers > > at > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412) > > at > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451) > > at > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508) > > at > > > > > org.apache.kafka.common.record.MemoryRecor
Re: Kafka Streams and broker compatibility
Hello Alisson, The root cause you've seen is the message header support, which is added in brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If your code does not add any more headers then it would only inherit the headers from source topics when trying to write to intermediate / sink topics. So I think that even if you were using 2.2.0 you'd still hit this issue if you happen to have headers in some of your source topic messages. I've updated https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per the updates. Guozhang On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales wrote: > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 to > 2.2.1 and found the following error at the end. > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11 > the error doesn't happen anymore. > > My question here is: where is the best place we can find the required > minimum broker version for the kafka-streams version one is using? > > This is not clear to me and the > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > wiki > page seems outdated. > > Thanks in advance > > Exception in thread > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed > to flush state store KTABLE-SUPPRESS-STATE-STORE-09 > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > at > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > at > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > at > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2] > Abort sending since an error caught with a previous record (key > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db > timestamp > null) to topic > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-09-changelog due to > java.lang.IllegalArgumentException: Magic v1 does not support record > headers > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244) > at > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284) > at > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > ... 10 more > Caused by: java.lang.IllegalArgumentException: Magic v1 does not support > record headers > at > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412) > at > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451) > at > > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508) > at > > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531) > at > > org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106) > at > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224) > at > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907) > at > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167) > -- -- Guozhang
Re: Question: Kafka as a message queue for long running tasks
Gerard, we have a similar use case we are using Kafka for, and are setting max.poll.interval.ms to a large value in order to handle the worst-case scenario. Rebalancing is indeed a big problem with this approach (and not just for "new" consumers as you mentioned -- adding consumers causes a stop-the-world rebalance on all existing consumers as well). The static consumer groups protocol introduced in Kafka 2.3 helps quite a lot with the failed-and-restarting consumer case (though I ran into several bugs with it and still have a stream that won't start in dev, it seems to be working in prod now with patched Kafka brokers), but does not handle scaling your consumers up (or down). Scaling consumers is still a stop-the-world rebalance, and you can expect it to take at least the max time any particular in-flight message takes to process (if all goes well), because your consumers will essentially be sitting around idle waiting for every other consumer to finish. Theoretically, the incremental rebalancing capabilities coming out soon (KAFKA-8179) should help resolve this. The other issue you will run into with long processing is "hot partitions/consumers" -- with a large backlog of such messages it is inevitable that the consumers of some partitions will take a lot longer to process than some other partitions, which means *some* of your consumers will just be sitting idle, while others are lagging *way* behind. There is no easy solution to this that I have found because one partition is always processed by at most one Kafka consumer. You'll have to work around this by reading a bunch of messages, distributing their work yourself, tracking completions outside of Kafka, and then committing offsets back to Kafka manually. Honestly, because of these issues, right now I am seriously considering a migration of our Kafka workloads to Apache Pulsar, which supports the same semantics as Kafka, but also supports work/message-queue style messaging *much* better, as it uses per-message acknowledgement, and subscriptions are not limited to the number of partitions i.e. you can have N partitions, and M+N consumers. Regards, Raman On Thu, Jun 13, 2019 at 1:54 PM Mark Anderson wrote: > > We have a different use case where we stop consuming due to connection to > an external system being down. > > In this case we sleep for the same period as our poll timeout would be and > recommit the previous offset. This stops the consumer going stale and > avoids increasing the max interval. > > Perhaps you could do something similar? > > Mark > > > On Thu, 13 Jun 2019, 17:49 Murphy, Gerard, wrote: > > > Hi, > > > > I am wondering if there is something I am missing about my set up to > > facilitate long running jobs. > > > > For my purposes it is ok to have `At most once` message delivery, this > > means it is not required to think about committing offsets (or at least it > > is ok to commit each message offset upon receiving it). > > > > I have the following in order to achieve the competing consumer pattern: > > > > * A topic > > * X consumers in the same group > > * P partitions in a topic (where P >= X always) > > > > My problem is that I have messages that can take ~15 minutes (but this may > > fluctuate by up to 50% lets say) in order to process. In order to avoid > > consumers having their partition assignments revoked I have increased the > > value of `max.poll.interval.ms` to reflect this. > > However this comes with some negative consequences: > > > > * if some message exceeds this length of time then in a worst case > > scenario a the consumer processing this message will have to wait up to the > > value of `max.poll.interval.ms` for a rebalance > > * if I need to scale and increase the number of consumers based on > > load then any new consumers might also have to wait the value of ` > > max.poll.interval.ms` for a rebalance to occur in order to process any > > messages > > > > As it stands at the moment I see that I can proceed as follows: > > > > * Set `max.poll.interval.ms` to be a small value and accept that > > every consumer processing every message will time out and go through the > > process of having assignments revoked and waiting a small amount of time > > for a rebalance > > > > However I do not like this, and am considering looking at alternative > > technology for my message queue as I do not see any obvious way around this. > > Admittedly I am new to Kafka, and it is just a gut feeling that the above > > is not desirable. > > I have used RabbitMQ in the past for these scenarios, however we need > > Kafka in our architecture for other purposes at the moment and it would be > > nice not to have to introduce another technology if Kafka can achieve this. > > > > I appreciate any advise that anybody can offer on this subject. > > > > Regards, > > Ger > >
Re: KafkaStreams - impact of retention on repartition topics
Cool! Thank you Matthias! On Sun, 25 Aug 2019 at 15:11, Matthias J. Sax wrote: > You cannot delete arbitrary data, however, it's possible to send a > "truncate request" to brokers, to delete data before the retention time > is reached: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient > > There is `AdminClient#deleteRecords(...)` API to do so. > > > -Matthias > > On 8/21/19 9:09 PM, Murilo Tavares wrote: > > Thanks Matthias for the prompt response. > > Now just for curiosity, how does that work? I thought it was not possible > > to easily delete topic data... > > > > > > On Wed, Aug 21, 2019 at 4:51 PM Matthias J. Sax > > wrote: > > > >> No need to worry about this. > >> > >> Kafka Streams used "purge data" calls, to actively delete data from > >> those topics after the records are processed. Hence, those topics won't > >> grow unbounded but are "truncated" on a regular basis. > >> > >> > >> -Matthias > >> > >> On 8/21/19 11:38 AM, Murilo Tavares wrote: > >>> Hi > >>> I have a complex KafkaStreams topology, where I have a bunch of KTables > >>> that I regroup (rekeying) and aggregate so I can join them. > >>> I've noticed that the "-repartition" topics created by the groupBy > >>> operations have a very long retention by default (Long.MAX_VALUE). > >>> I'm a bit concerned about the size of these topics, as they will retain > >>> data forever. I wonder why are they so long, and what would be the > impact > >>> of reducing this retention? > >>> Thanks > >>> Murilo > >>> > >> > >> > > > >