Re: Redis as state store
Thanks! On Mon, Mar 15, 2021 at 8:38 PM Sophie Blee-Goldman wrote: > Yep, that fell off my radar. Here we go: > https://issues.apache.org/jira/browse/KAFKA-12475 > > On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang wrote: > > > Hey Sophie, > > > > Maybe we can first create a JIRA ticket for this? > > > > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman > > wrote: > > > > > Sounds good! I meant anyone who is interested :) > > > > > > Let me know if you have any questions after digging in to this > > > > > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig > > wrote: > > > > > > > Hey Sophie, not sure if you meant me or not but I'd be happy to > take a > > > > stab at creating a KIP for this. I want to spend some time digging > > into > > > > more of how this works first, but will then try to gather my thoughts > > and > > > > get something created. > > > > > > > > Alex > > > > > > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > > > > wrote: > > > > > > > > > This certainly does seem like a flaw in the Streams API, although > of > > > > course > > > > > Streams is just > > > > > in general not designed for use with remote anything (API calls, > > > stores, > > > > > etc) > > > > > > > > > > That said, I don't see any reason why we *couldn't* have better > > support > > > > for > > > > > remote state stores. > > > > > Note that there's currently no deleteAll method on the store > > interface, > > > > and > > > > > not all databases > > > > > necessarily support that. But we could add a default implementation > > > which > > > > > just calls delete(key) > > > > > on all keys in the state store, and for the RocksDB-based state > > stores > > > we > > > > > still wipe out the state > > > > > as usual (and recommend the same for any custom StateStore which is > > > local > > > > > rather than remote). > > > > > Obviously falling back on individual delete(key) operations for all > > the > > > > > data in the entire store will > > > > > have worse performance, but that's better than silently breaking > EOS > > > when > > > > > deleteAll is not available > > > > > on a remote store. > > > > > > > > > > Would you be interested in doing a KIP for this? We should also > file > > a > > > > JIRA > > > > > with the above explanation, > > > > > so that other users of remote storage are aware of this limitation. > > And > > > > > definitely document this somewhere > > > > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > > > > > > > > > > wrote: > > > > > > > > > > > Hi Alex, > > > > > > > > > > > > I guess wiping out the state directory is easier code-wise, > faster, > > > > > > and/or at the time of development the developers did not design > for > > > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > > > > > Off the top of my head, I do not know how to solve this for > remote > > > > state > > > > > > stores. Using the uncaught exception handler is not good, > because a > > > > > > computing node could fail without giving the JVM the opportunity > to > > > > > > throw an exception. > > > > > > > > > > > > In your tests, try to increase the commit interval to a high > value > > > and > > > > > > see if you get inconsistencies. You should get an inconsistency > if > > > the > > > > > > state store maintains counts for keys and after the last commit > > > before > > > > > > the failure, the Streams app puts an event with a new key K with > > > value > > > > 1 > > > > > > into the state store. After failover, Streams would put the same > > > event > > > > > > with key K again into the state store. If the state store deleted > > all > > > > of > > > > > > its data, Streams would put again value 1, but if the state store > > did > > > > > > not delete all data, Streams would put value 2 which is wrong > > because > > > > it > > > > > > would count the same event twice. > > > > > > > > > > > > Best, > > > > > > Bruno > > > > > > > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > > > Bruno, > > > > > > > Thanks for the info! that makes sense. Of course now I have > > more > > > > > > > questions. :) Do you know why this is being done outside of > the > > > > state > > > > > > > store API? I assume there are reasons why a "deleteAll()" type > > of > > > > > > function > > > > > > > wouldn't work, thereby allowing a state store to purge itself? > > And > > > > > maybe > > > > > > > more importantly, is there a way to achieve a similar behavior > > > with a > > > > > 3rd > > > > > > > party store? I'm not sure if hooking into the uncaught > exception > > > > > handler > > > > > > > might be a good way to purge/drop a state store in the event > of a > > > > fatal > > > > > > > error? I did setup a MongoDB state store recently as part of a > > POC > > > > and > > > > > > was > > > > > > > testing it with EOS enabled. (forcing crashes to occur and > > > checking > > > > > that > > > > > > > the result of my aggregation was still ac
Re: Kafka custom partition - consumers assignor with custom per partition user/custom data
I believe I already answered your question in another channel, but just to follow up in this thread in case anyone else is interested in the answer: You can override the *ConsumerPartitionAssignor.onAssignment(Assignment, ConsumerGroupMetadata)* method to get an update on the currently assigned partitions after each rebalance. The *Assignment* parameter contains two fields: the assigned partitions, and the serialized assignment userdata, if any. On Sat, Mar 6, 2021 at 6:34 AM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Hi all, > > I am implementing a custom consumers to topics/partitions assignor in > Kafka. To this end, I am overriding the AbstractPartitionAssignor which in > turn implements the ConsumerPartitionAssignor interface. > > As part of the custom assignor, I want to send a single (float) > information about each partition of each topic that the consumer subscribes > to. > > I am aware that I can send custom data to the assignor by overriding the > default method > ByteBuffer subscriptionUserData(Set topics) . > > However, the issue is that from the method signature above I can not get > the list of partitions assigned to the underlined consumer for each of the > topics that the consumer registers for. > > On the other hand, I can see that the subscription class sent by each of > the consumers to the group coordinator has list of the owned partitions per > consumer. > public static final class Subscription { > private final List topics; > private final ByteBuffer userData; > private final List ownedPartitions; > private Optional groupInstanceId; > > > Any hint on how I can send custom per partition data to the group > coordinator through the method ByteBuffer subscriptionUserData(Set > topics), or using any other way that relies only on the kafka public APIs. > > Thank you. > >
Re: Slightly Modified Sticky Assignor.
Hey Mazen, The easiest way to approach this is probably to pass in a reference to the associated Consumer and then just call one of the *Consumer#committed *methods which return the OffsetAndMetadata. But I'm guessing your underling question may be about how to get that reference to the Consumer in the first place. There really isn't very good support for this in the ConsumerPartitionAssignor interface since it relies on reflection to instantiate the assignor with the default constructor. I would recommend making your custom ConsumerPartitionAssignor implement the Configurable interface, and then use the configs you pass in to the Consumer to ultimately get a handle on it. Since you have to provide the configs to construct the Consumer in the first place, you might need a layer of indirection: for example class ConsumerProvider { private Consumer consumer; public void setConsumer(Consumer consumer); public Consumer getConsumer(); } // main code ConsumerProvider provider = new ConsumerProvider; consumerConfig.put("MY_CONSUMER_PROVIDER", provider); Consumer consumer = new KafkaConsumer(consumerConfig, ...); provider.setConsumer(consumer); class MyAssignor implements ConsumerPArtitionAssignor, Configurable { private ConsumerProvider; @Override public void configure(configs) { this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER"); } @Override ByteBuffer subscriptionUserData(topics) { offsets = consumerProvider.getConsumer().committed(...); } } Just a warning, I haven't actually tested this out, but the general idea of using configs should work. I know you said "seamless" and this is anything but :/ Maybe I'm tired and missing something obvious, but clearly there's room for improvement here. You can file a JIRA ticket to improve the partition assignor experience and make it easier to set up (and even work on this yourself if you're interested) Unfortunately you generally want to keep the subscriptionUserData() method pretty light, and this method will make a remote call to the server. To be honest, I'm not totally sure why that is the case, since the Consumer should know what offsets it's committed for which partitions...maybe someone else can jump in on the choice behind that. This has come up before, so it's worth investigating whether we can just return the cached offsets if they're available and only make a remote call for *Consumer#committed* if absolutely necessary. I'll try to follow up on that On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Hi all, > > I am implementing a custom partition assignor slightly different than the > sticky assignor assignor. As known, in the sticky assignor each consumer > sends the set of owned partitions as part of its subscription message. This > happens in the subscriptionUserData by calling the > serializeTopicPartitionAssignment method etc… > > Kindly, my question is that what is the most seamless way to get offset > information (e.g., last committed offset) for each owned partition from > within the subscriptionUserData method or generally from within the > stickyAssignor class, preferably using public APIs. > > Thank you. >
Emit events that are NOT joined
Hi, I am trying to find the best pattern to solve a specific problem using Kafka streaming. All of our current processing uses the Kafka streaming API (using multiple joins, windows, repartitions etc) so I already think I have a decent grasp of the fundamentals. We have 2 streams of events: - primary events (P), which indicate some key event in the system and carry a large amount of data - secondary events (S), which should *always* occur as a follow-on to the primary event and only contain a reference to the single associated primary event. I want to join secondary events to primary events (the simple part) BUT I also want to find out when secondary events have been *unable* to be joined. A secondary is unable to be joined: - when primary event delivery has been delayed (so that secondary events are received before the associated primary event) - when primary events go missing (the event collection system is noisy, so we do lose a small bu significant number of primary events) - due to coding errors in the collectors, where an incorrect reference has been inserted into the secondary event Currently this functionality is implemented using a database: - primary events are inserted into the database and then secondary events lookup the primary by-reference. If the primary is found the secondary is sent to a "JOINED" topic. - if the primary is not found, the secondary event is buffered in the database until the primary is received and then joined+emitted (and the secondary event is removed from the DB) - after some arbitrary time period, the database is queried for outstanding not-joined secondary events and they are emitted to an "UNJOINED" topic. This allows alerting on unmatched secondary events to drive quality measures, and allows offline analysis (to understand why) Some options: 1. Implement the same strategy as existing except using Kafka state stores instead of the DB. With this approach I am concerned about atomic correctness - i.e. that state in the Kafka store can be managed so that the event is never sent to both JOINED and UNJOINED. 2. Continually emit key-values to a "PENDING" topic for the secondary join. An example sequence could be something like ...(where primary events = P, secondary events = S) : a) receive S with no matching P => emit {S, false} b) receive matching P for S => emit {S, null} (to effectively delete it from the topic) c) receive S with matching P => do not emit anything Now the problem becomes more like building a time-window of events from PENDING, to eventually emit the events to UNJOINED. I am also uncertain as how to ensure events can never end up in both JOINED and UNJOINED. My apologies for the wall of text .. I find it a difficult problem to explain. 😏 Is there some pattern I am missing that will help solve this problem? Any help / other suggestions would be appreciated. Thanks, Ross
Re: [VOTE] 2.6.2 RC0
Thanks Luke. I'll make sure to bump this in the kafka-site repo once the release is finalized. On Mon, Mar 15, 2021 at 8:58 PM Luke Chen wrote: > Hi Sophie, > A small doc update for 2.6.2. I think we missed it. > https://github.com/apache/kafka/pull/10328 > > Thanks. > Luke > > On Tue, Mar 16, 2021 at 11:12 AM Guozhang Wang wrote: > > > Hi Sophie, > > > > I've reviewed the javadocs / release notes / documentations, and they > LGTM. > > > > +1. > > > > > > Guozhang > > > > On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman > > wrote: > > > > > Hello Kafka users, developers and client-developers, > > > > > > This is the first candidate for release of Apache Kafka 2.6.2. > > > > > > Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the > > 2.6.1 > > > release. Please see the release notes for more information. > > > > > > Release notes for the 2.6.2 release: > > > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html > > > > > > *** Please download, test and vote by Friday, March 19th, 9am PST > > > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > > https://kafka.apache.org/KEYS > > > > > > * Release artifacts to be voted upon (source and binary): > > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/ > > > > > > * Maven artifacts to be voted upon: > > > https://repository.apache.org/content/groups/staging/org/apache/kafka/ > > > > > > * Javadoc: > > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/ > > > > > > * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag: > > > https://github.com/apache/kafka/releases/tag/2.6.2-rc0 > > > > > > * Documentation: > > > https://kafka.apache.org/26/documentation.html > > > > > > * Protocol: > > > https://kafka.apache.org/26/protocol.html > > > > > > * Successful Jenkins builds for the 2.6 branch: > > > Unit/integration tests: > > > https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/ > > > System tests: > > > > https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/ > > > > > > /** > > > > > > Thanks, > > > Sophie > > > > > > > > > -- > > -- Guozhang > > >
Re: [VOTE] 2.6.2 RC0
Hi Sophie, A small doc update for 2.6.2. I think we missed it. https://github.com/apache/kafka/pull/10328 Thanks. Luke On Tue, Mar 16, 2021 at 11:12 AM Guozhang Wang wrote: > Hi Sophie, > > I've reviewed the javadocs / release notes / documentations, and they LGTM. > > +1. > > > Guozhang > > On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman > wrote: > > > Hello Kafka users, developers and client-developers, > > > > This is the first candidate for release of Apache Kafka 2.6.2. > > > > Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the > 2.6.1 > > release. Please see the release notes for more information. > > > > Release notes for the 2.6.2 release: > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html > > > > *** Please download, test and vote by Friday, March 19th, 9am PST > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > https://kafka.apache.org/KEYS > > > > * Release artifacts to be voted upon (source and binary): > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/ > > > > * Maven artifacts to be voted upon: > > https://repository.apache.org/content/groups/staging/org/apache/kafka/ > > > > * Javadoc: > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/ > > > > * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag: > > https://github.com/apache/kafka/releases/tag/2.6.2-rc0 > > > > * Documentation: > > https://kafka.apache.org/26/documentation.html > > > > * Protocol: > > https://kafka.apache.org/26/protocol.html > > > > * Successful Jenkins builds for the 2.6 branch: > > Unit/integration tests: > > https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/ > > System tests: > > https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/ > > > > /** > > > > Thanks, > > Sophie > > > > > -- > -- Guozhang >
Re: Redis as state store
Yep, that fell off my radar. Here we go: https://issues.apache.org/jira/browse/KAFKA-12475 On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang wrote: > Hey Sophie, > > Maybe we can first create a JIRA ticket for this? > > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman > wrote: > > > Sounds good! I meant anyone who is interested :) > > > > Let me know if you have any questions after digging in to this > > > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig > wrote: > > > > > Hey Sophie, not sure if you meant me or not but I'd be happy to take a > > > stab at creating a KIP for this. I want to spend some time digging > into > > > more of how this works first, but will then try to gather my thoughts > and > > > get something created. > > > > > > Alex > > > > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > > > wrote: > > > > > > > This certainly does seem like a flaw in the Streams API, although of > > > course > > > > Streams is just > > > > in general not designed for use with remote anything (API calls, > > stores, > > > > etc) > > > > > > > > That said, I don't see any reason why we *couldn't* have better > support > > > for > > > > remote state stores. > > > > Note that there's currently no deleteAll method on the store > interface, > > > and > > > > not all databases > > > > necessarily support that. But we could add a default implementation > > which > > > > just calls delete(key) > > > > on all keys in the state store, and for the RocksDB-based state > stores > > we > > > > still wipe out the state > > > > as usual (and recommend the same for any custom StateStore which is > > local > > > > rather than remote). > > > > Obviously falling back on individual delete(key) operations for all > the > > > > data in the entire store will > > > > have worse performance, but that's better than silently breaking EOS > > when > > > > deleteAll is not available > > > > on a remote store. > > > > > > > > Would you be interested in doing a KIP for this? We should also file > a > > > JIRA > > > > with the above explanation, > > > > so that other users of remote storage are aware of this limitation. > And > > > > definitely document this somewhere > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > > > > > > > wrote: > > > > > > > > > Hi Alex, > > > > > > > > > > I guess wiping out the state directory is easier code-wise, faster, > > > > > and/or at the time of development the developers did not design for > > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > > > Off the top of my head, I do not know how to solve this for remote > > > state > > > > > stores. Using the uncaught exception handler is not good, because a > > > > > computing node could fail without giving the JVM the opportunity to > > > > > throw an exception. > > > > > > > > > > In your tests, try to increase the commit interval to a high value > > and > > > > > see if you get inconsistencies. You should get an inconsistency if > > the > > > > > state store maintains counts for keys and after the last commit > > before > > > > > the failure, the Streams app puts an event with a new key K with > > value > > > 1 > > > > > into the state store. After failover, Streams would put the same > > event > > > > > with key K again into the state store. If the state store deleted > all > > > of > > > > > its data, Streams would put again value 1, but if the state store > did > > > > > not delete all data, Streams would put value 2 which is wrong > because > > > it > > > > > would count the same event twice. > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > > Bruno, > > > > > > Thanks for the info! that makes sense. Of course now I have > more > > > > > > questions. :) Do you know why this is being done outside of the > > > state > > > > > > store API? I assume there are reasons why a "deleteAll()" type > of > > > > > function > > > > > > wouldn't work, thereby allowing a state store to purge itself? > And > > > > maybe > > > > > > more importantly, is there a way to achieve a similar behavior > > with a > > > > 3rd > > > > > > party store? I'm not sure if hooking into the uncaught exception > > > > handler > > > > > > might be a good way to purge/drop a state store in the event of a > > > fatal > > > > > > error? I did setup a MongoDB state store recently as part of a > POC > > > and > > > > > was > > > > > > testing it with EOS enabled. (forcing crashes to occur and > > checking > > > > that > > > > > > the result of my aggregation was still accurate) I was unable to > > > cause > > > > > > inconsistent data in the mongo store (which is good!), though of > > > > course I > > > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > > > > > Alex > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole < > > pdeole2...@gmail.com> > > > > > wrote: > > > > > > > > > > > >> Bruno
Re: [VOTE] 2.6.2 RC0
Hi Sophie, I've reviewed the javadocs / release notes / documentations, and they LGTM. +1. Guozhang On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman wrote: > Hello Kafka users, developers and client-developers, > > This is the first candidate for release of Apache Kafka 2.6.2. > > Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 2.6.1 > release. Please see the release notes for more information. > > Release notes for the 2.6.2 release: > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html > > *** Please download, test and vote by Friday, March 19th, 9am PST > > Kafka's KEYS file containing PGP keys we use to sign the release: > https://kafka.apache.org/KEYS > > * Release artifacts to be voted upon (source and binary): > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/ > > * Maven artifacts to be voted upon: > https://repository.apache.org/content/groups/staging/org/apache/kafka/ > > * Javadoc: > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/ > > * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag: > https://github.com/apache/kafka/releases/tag/2.6.2-rc0 > > * Documentation: > https://kafka.apache.org/26/documentation.html > > * Protocol: > https://kafka.apache.org/26/protocol.html > > * Successful Jenkins builds for the 2.6 branch: > Unit/integration tests: > https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/ > System tests: > https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/ > > /** > > Thanks, > Sophie > -- -- Guozhang
Re: Redis as state store
Hey Sophie, Maybe we can first create a JIRA ticket for this? On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman wrote: > Sounds good! I meant anyone who is interested :) > > Let me know if you have any questions after digging in to this > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig wrote: > > > Hey Sophie, not sure if you meant me or not but I'd be happy to take a > > stab at creating a KIP for this. I want to spend some time digging into > > more of how this works first, but will then try to gather my thoughts and > > get something created. > > > > Alex > > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > > wrote: > > > > > This certainly does seem like a flaw in the Streams API, although of > > course > > > Streams is just > > > in general not designed for use with remote anything (API calls, > stores, > > > etc) > > > > > > That said, I don't see any reason why we *couldn't* have better support > > for > > > remote state stores. > > > Note that there's currently no deleteAll method on the store interface, > > and > > > not all databases > > > necessarily support that. But we could add a default implementation > which > > > just calls delete(key) > > > on all keys in the state store, and for the RocksDB-based state stores > we > > > still wipe out the state > > > as usual (and recommend the same for any custom StateStore which is > local > > > rather than remote). > > > Obviously falling back on individual delete(key) operations for all the > > > data in the entire store will > > > have worse performance, but that's better than silently breaking EOS > when > > > deleteAll is not available > > > on a remote store. > > > > > > Would you be interested in doing a KIP for this? We should also file a > > JIRA > > > with the above explanation, > > > so that other users of remote storage are aware of this limitation. And > > > definitely document this somewhere > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > > > > wrote: > > > > > > > Hi Alex, > > > > > > > > I guess wiping out the state directory is easier code-wise, faster, > > > > and/or at the time of development the developers did not design for > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > Off the top of my head, I do not know how to solve this for remote > > state > > > > stores. Using the uncaught exception handler is not good, because a > > > > computing node could fail without giving the JVM the opportunity to > > > > throw an exception. > > > > > > > > In your tests, try to increase the commit interval to a high value > and > > > > see if you get inconsistencies. You should get an inconsistency if > the > > > > state store maintains counts for keys and after the last commit > before > > > > the failure, the Streams app puts an event with a new key K with > value > > 1 > > > > into the state store. After failover, Streams would put the same > event > > > > with key K again into the state store. If the state store deleted all > > of > > > > its data, Streams would put again value 1, but if the state store did > > > > not delete all data, Streams would put value 2 which is wrong because > > it > > > > would count the same event twice. > > > > > > > > Best, > > > > Bruno > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > Bruno, > > > > > Thanks for the info! that makes sense. Of course now I have more > > > > > questions. :) Do you know why this is being done outside of the > > state > > > > > store API? I assume there are reasons why a "deleteAll()" type of > > > > function > > > > > wouldn't work, thereby allowing a state store to purge itself? And > > > maybe > > > > > more importantly, is there a way to achieve a similar behavior > with a > > > 3rd > > > > > party store? I'm not sure if hooking into the uncaught exception > > > handler > > > > > might be a good way to purge/drop a state store in the event of a > > fatal > > > > > error? I did setup a MongoDB state store recently as part of a POC > > and > > > > was > > > > > testing it with EOS enabled. (forcing crashes to occur and > checking > > > that > > > > > the result of my aggregation was still accurate) I was unable to > > cause > > > > > inconsistent data in the mongo store (which is good!), though of > > > course I > > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > > > Alex > > > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole < > pdeole2...@gmail.com> > > > > wrote: > > > > > > > > > >> Bruno, > > > > >> > > > > >> i tried to explain this in 'kafka user's language through above > > > > mentioned > > > > >> scenario, hope i put it properly -:) and correct me if i am wrong > > > > >> > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole < > pdeole2...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > >>> This is what I understand could be the issue with external state > > > store: > > > > >>> > > > > >>> kafka stream appl
Right Use Case For Kafka Streams?
Hi, We have a requirement to calculate metrics on a huge number of keys (could be hundreds of millions, perhaps billions of keys - attempting caching on individual keys in many cases will have almost a 0% cache hit rate). Is Kafka Streams with RocksDB and compacting topics the right tool for a task like that? As well, just from playing with Kafka Streams for a week it feels like it wants to create a lot of separate stores by default (if I want to calculate aggregates on five, ten and 30 days I will get three separate stores by default for this state data). Coming from a different distributed storage solution, I feel like I want to put them together in one store as I/O has always been my bottleneck (1 big read and 1 big write is better than three small separate reads and three small separate writes). But am I perhaps missing something here? I don't want to avoid the DSL that Kafka Streams provides if I don't have to. Will the Kafka Streams RocksDB solution be so much faster than a distributed read that it won't be the bottleneck even with huge amounts of data? Any info/opinions would be greatly appreciated. thanks in advance, Gareth Collins
Re: Redis as state store
Sounds good! I meant anyone who is interested :) Let me know if you have any questions after digging in to this On Mon, Mar 15, 2021 at 2:39 PM Alex Craig wrote: > Hey Sophie, not sure if you meant me or not but I'd be happy to take a > stab at creating a KIP for this. I want to spend some time digging into > more of how this works first, but will then try to gather my thoughts and > get something created. > > Alex > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > wrote: > > > This certainly does seem like a flaw in the Streams API, although of > course > > Streams is just > > in general not designed for use with remote anything (API calls, stores, > > etc) > > > > That said, I don't see any reason why we *couldn't* have better support > for > > remote state stores. > > Note that there's currently no deleteAll method on the store interface, > and > > not all databases > > necessarily support that. But we could add a default implementation which > > just calls delete(key) > > on all keys in the state store, and for the RocksDB-based state stores we > > still wipe out the state > > as usual (and recommend the same for any custom StateStore which is local > > rather than remote). > > Obviously falling back on individual delete(key) operations for all the > > data in the entire store will > > have worse performance, but that's better than silently breaking EOS when > > deleteAll is not available > > on a remote store. > > > > Would you be interested in doing a KIP for this? We should also file a > JIRA > > with the above explanation, > > so that other users of remote storage are aware of this limitation. And > > definitely document this somewhere > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > wrote: > > > > > Hi Alex, > > > > > > I guess wiping out the state directory is easier code-wise, faster, > > > and/or at the time of development the developers did not design for > > > remote state stores. But I do actually not know the exact reason. > > > > > > Off the top of my head, I do not know how to solve this for remote > state > > > stores. Using the uncaught exception handler is not good, because a > > > computing node could fail without giving the JVM the opportunity to > > > throw an exception. > > > > > > In your tests, try to increase the commit interval to a high value and > > > see if you get inconsistencies. You should get an inconsistency if the > > > state store maintains counts for keys and after the last commit before > > > the failure, the Streams app puts an event with a new key K with value > 1 > > > into the state store. After failover, Streams would put the same event > > > with key K again into the state store. If the state store deleted all > of > > > its data, Streams would put again value 1, but if the state store did > > > not delete all data, Streams would put value 2 which is wrong because > it > > > would count the same event twice. > > > > > > Best, > > > Bruno > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > Bruno, > > > > Thanks for the info! that makes sense. Of course now I have more > > > > questions. :) Do you know why this is being done outside of the > state > > > > store API? I assume there are reasons why a "deleteAll()" type of > > > function > > > > wouldn't work, thereby allowing a state store to purge itself? And > > maybe > > > > more importantly, is there a way to achieve a similar behavior with a > > 3rd > > > > party store? I'm not sure if hooking into the uncaught exception > > handler > > > > might be a good way to purge/drop a state store in the event of a > fatal > > > > error? I did setup a MongoDB state store recently as part of a POC > and > > > was > > > > testing it with EOS enabled. (forcing crashes to occur and checking > > that > > > > the result of my aggregation was still accurate) I was unable to > cause > > > > inconsistent data in the mongo store (which is good!), though of > > course I > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > Alex > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole > > > wrote: > > > > > > > >> Bruno, > > > >> > > > >> i tried to explain this in 'kafka user's language through above > > > mentioned > > > >> scenario, hope i put it properly -:) and correct me if i am wrong > > > >> > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole > > > > >> wrote: > > > >> > > > >>> This is what I understand could be the issue with external state > > store: > > > >>> > > > >>> kafka stream application consumes source topic, does processing, > > stores > > > >>> state to kafka state store (this is backed by topic) and before > > > producing > > > >>> event on destination topic, the application fails with some issue. > In > > > >>> this case, the transaction has failed, so kafka guarantees either > all > > > or > > > >>> none, means offset written to source topic, state written to state > > > store > > > >>> topic, output produced on de
MirrorMaker 2 and Negative Lag
I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics from one cluster to another while preserving through `sync.group.offsets.enabled=true`. My source cluster is running Kafka 0.10, while the target cluster is running 2.6.1. While I can see data being replicated, the data on the replicated Consumer Group in the target cluster looks wrong. The lag values of the replicated Consumer Group are large negative values, and the LOG-END-OFFSET are mostly 0. I determined this information from kafka-consumer-groups.sh. I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag JMX metrics in MM2 and the reported lag is zero for all partitions. By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will automatically replicate and sync all Consumer Groups with a meaningful offset in the target cluster. Am I misunderstanding how MM2 is supposed to work? Here is my mm2.properties and the CG details. # mm2.properties ``` clusters = src, dst src.bootstrap.servers = 10.0.0.1:9092 dst.bootstrap.servers = 10.0.0.2:9092 src->dst.enabled = true src->dst.topics = compute.* src->dst.offset.flush.timeout.ms=6 src->dst.buffer.memory=1 dst->src.enabled = true dst->src.topics = .* replication.factor=3 src->dst.sync.group.offsets.enabled = true src->dst.emit.checkpoints.enabled = true src->dst.consumer.auto.offset.reset=latest consumer.auto.offset.reset = latest auto.offset.reset = latest replication.policy.class = com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 sync.topic.acls.enabled = false sync.group.offsets.enabled = true emit.checkpoints.enabled = true tasks.max = 8 dst.producer.offset.flush.timeout.ms = 6 dst.offset.flush.timeout.ms = 6 ``` Consumer Group details ``` GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID kafka-group-Compute-Requests Compute-Requests 57 5305947 0 -5305947- - - kafka-group-Compute-Requests Compute-Requests 20 5164205 0 -5164205- - - kafka-group-Compute-Requests Compute-Requests 53 4208527 0 -4208527- - - kafka-group-Compute-Requests Compute-Requests 82 5247928 0 -5247928- - - kafka-group-Compute-Requests Compute-Requests 65 5574520 0 -5574520- - - kafka-group-Compute-Requests Compute-Requests 11 5190708 209 -5190499- - - ``` Thanks ... Alan
Re: Redis as state store
Hey Sophie, not sure if you meant me or not but I'd be happy to take a stab at creating a KIP for this. I want to spend some time digging into more of how this works first, but will then try to gather my thoughts and get something created. Alex On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman wrote: > This certainly does seem like a flaw in the Streams API, although of course > Streams is just > in general not designed for use with remote anything (API calls, stores, > etc) > > That said, I don't see any reason why we *couldn't* have better support for > remote state stores. > Note that there's currently no deleteAll method on the store interface, and > not all databases > necessarily support that. But we could add a default implementation which > just calls delete(key) > on all keys in the state store, and for the RocksDB-based state stores we > still wipe out the state > as usual (and recommend the same for any custom StateStore which is local > rather than remote). > Obviously falling back on individual delete(key) operations for all the > data in the entire store will > have worse performance, but that's better than silently breaking EOS when > deleteAll is not available > on a remote store. > > Would you be interested in doing a KIP for this? We should also file a JIRA > with the above explanation, > so that other users of remote storage are aware of this limitation. And > definitely document this somewhere > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > wrote: > > > Hi Alex, > > > > I guess wiping out the state directory is easier code-wise, faster, > > and/or at the time of development the developers did not design for > > remote state stores. But I do actually not know the exact reason. > > > > Off the top of my head, I do not know how to solve this for remote state > > stores. Using the uncaught exception handler is not good, because a > > computing node could fail without giving the JVM the opportunity to > > throw an exception. > > > > In your tests, try to increase the commit interval to a high value and > > see if you get inconsistencies. You should get an inconsistency if the > > state store maintains counts for keys and after the last commit before > > the failure, the Streams app puts an event with a new key K with value 1 > > into the state store. After failover, Streams would put the same event > > with key K again into the state store. If the state store deleted all of > > its data, Streams would put again value 1, but if the state store did > > not delete all data, Streams would put value 2 which is wrong because it > > would count the same event twice. > > > > Best, > > Bruno > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > Bruno, > > > Thanks for the info! that makes sense. Of course now I have more > > > questions. :) Do you know why this is being done outside of the state > > > store API? I assume there are reasons why a "deleteAll()" type of > > function > > > wouldn't work, thereby allowing a state store to purge itself? And > maybe > > > more importantly, is there a way to achieve a similar behavior with a > 3rd > > > party store? I'm not sure if hooking into the uncaught exception > handler > > > might be a good way to purge/drop a state store in the event of a fatal > > > error? I did setup a MongoDB state store recently as part of a POC and > > was > > > testing it with EOS enabled. (forcing crashes to occur and checking > that > > > the result of my aggregation was still accurate) I was unable to cause > > > inconsistent data in the mongo store (which is good!), though of > course I > > > may just have been getting lucky. Thanks again for your help, > > > > > > Alex > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole > > wrote: > > > > > >> Bruno, > > >> > > >> i tried to explain this in 'kafka user's language through above > > mentioned > > >> scenario, hope i put it properly -:) and correct me if i am wrong > > >> > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole > > >> wrote: > > >> > > >>> This is what I understand could be the issue with external state > store: > > >>> > > >>> kafka stream application consumes source topic, does processing, > stores > > >>> state to kafka state store (this is backed by topic) and before > > producing > > >>> event on destination topic, the application fails with some issue. In > > >>> this case, the transaction has failed, so kafka guarantees either all > > or > > >>> none, means offset written to source topic, state written to state > > store > > >>> topic, output produced on destination topic... all of these happen or > > >> none > > >>> of these and in this failure scenario it is none of these. > > >>> > > >>> Assume you have redis state store, and you updated the state into > redis > > >>> and stream application failed. Now, you have source topic and > > destination > > >>> topic consistent i.e. offset is not committed to source topic and > > output > > >>> not produced on destination topic
Re: Redis as state store
This certainly does seem like a flaw in the Streams API, although of course Streams is just in general not designed for use with remote anything (API calls, stores, etc) That said, I don't see any reason why we *couldn't* have better support for remote state stores. Note that there's currently no deleteAll method on the store interface, and not all databases necessarily support that. But we could add a default implementation which just calls delete(key) on all keys in the state store, and for the RocksDB-based state stores we still wipe out the state as usual (and recommend the same for any custom StateStore which is local rather than remote). Obviously falling back on individual delete(key) operations for all the data in the entire store will have worse performance, but that's better than silently breaking EOS when deleteAll is not available on a remote store. Would you be interested in doing a KIP for this? We should also file a JIRA with the above explanation, so that other users of remote storage are aware of this limitation. And definitely document this somewhere On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna wrote: > Hi Alex, > > I guess wiping out the state directory is easier code-wise, faster, > and/or at the time of development the developers did not design for > remote state stores. But I do actually not know the exact reason. > > Off the top of my head, I do not know how to solve this for remote state > stores. Using the uncaught exception handler is not good, because a > computing node could fail without giving the JVM the opportunity to > throw an exception. > > In your tests, try to increase the commit interval to a high value and > see if you get inconsistencies. You should get an inconsistency if the > state store maintains counts for keys and after the last commit before > the failure, the Streams app puts an event with a new key K with value 1 > into the state store. After failover, Streams would put the same event > with key K again into the state store. If the state store deleted all of > its data, Streams would put again value 1, but if the state store did > not delete all data, Streams would put value 2 which is wrong because it > would count the same event twice. > > Best, > Bruno > > > On 15.03.21 15:20, Alex Craig wrote: > > Bruno, > > Thanks for the info! that makes sense. Of course now I have more > > questions. :) Do you know why this is being done outside of the state > > store API? I assume there are reasons why a "deleteAll()" type of > function > > wouldn't work, thereby allowing a state store to purge itself? And maybe > > more importantly, is there a way to achieve a similar behavior with a 3rd > > party store? I'm not sure if hooking into the uncaught exception handler > > might be a good way to purge/drop a state store in the event of a fatal > > error? I did setup a MongoDB state store recently as part of a POC and > was > > testing it with EOS enabled. (forcing crashes to occur and checking that > > the result of my aggregation was still accurate) I was unable to cause > > inconsistent data in the mongo store (which is good!), though of course I > > may just have been getting lucky. Thanks again for your help, > > > > Alex > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole > wrote: > > > >> Bruno, > >> > >> i tried to explain this in 'kafka user's language through above > mentioned > >> scenario, hope i put it properly -:) and correct me if i am wrong > >> > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole > >> wrote: > >> > >>> This is what I understand could be the issue with external state store: > >>> > >>> kafka stream application consumes source topic, does processing, stores > >>> state to kafka state store (this is backed by topic) and before > producing > >>> event on destination topic, the application fails with some issue. In > >>> this case, the transaction has failed, so kafka guarantees either all > or > >>> none, means offset written to source topic, state written to state > store > >>> topic, output produced on destination topic... all of these happen or > >> none > >>> of these and in this failure scenario it is none of these. > >>> > >>> Assume you have redis state store, and you updated the state into redis > >>> and stream application failed. Now, you have source topic and > destination > >>> topic consistent i.e. offset is not committed to source topic and > output > >>> not produced on destination topic, but you redis state store is > >>> inconsistent with that since it is external state store and kafka can't > >>> guarantee rollback ot state written there > >>> > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig > >> wrote: > >>> > " Another issue with 3rd party state stores could be violation of > exactly-once guarantee provided by kafka streams in the event of a > >> failure > of streams application instance" > > I've heard this before but would love to know more about how a custom > state > >>>
Kafka Streams And Partitioning
Hi, This may be a newbie question but is it possible to control the partitioning of a RocksDB KeyValueStore in Kafka Streams? For example, I perhaps only want to partition based on a prefix of a key rather than the full key. I assume something similar must be done for the WindowStore to partition without the window start time and sequence number (otherwise window entries could be spread across partitions)? Sort of like the window store, I am wanting to be able to retrieve all values with a certain key prefix from the KeyValueStore with one read operation. Is this possible? thanks in advance, Gareth Collins
Re: Rebalancing and scaling of consumers on kubernetes, instanteous scale to x consumer replicas ==> x rebalancing?
Hey Mazen, There's not necessarily one rebalance per new consumer, in theory if all 100 consumers are started up at the same time then there may be just a single rebalance. It really depends on the timing -- for example in the log snippet you provided, you can see that the first member joined at 13:57:34 and the rebalance completed ~3 seconds later at 13:57:37. Then the second member is seen joining at 14:07:43, which is just over 10 minutes later. I think you need to investigate why there's such a long delay between the first and second consumers joining the group. On Mon, Mar 15, 2021 at 7:58 AM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Hi all, > > I have a kafka consumer pod running on kubernetes, I executed the command > kubectl scale consumerName --replicas=2, and as shown in the logs below > two seperate rebalancing processes were trigerred, so if the number of > consumer replicas scaled = 100, one hundred seperate rebalancing are going > to be trigerred. is that accurate? am I missing something? any workaroud > to trigger a single rebalancing regardless of the number of replicas in the > scale command. > > > group coordinator logs > = > > > 2021-03-15 13:57:34,230 INFO [GroupCoordinator 1]: Preparing to rebalance > group debugconsumerlag in state PreparingRebalance with old generation 0 > (__consumer_offsets-31) (reason: Adding new member > consumer-debugconsumerlag-1-1a577d6c-7389-4217-883f-89535032ae02 with group > instance id None) (kafka.coordinator.group.GroupCoordinator) > [data-plane-kafka-request-handler-5] > 2021-03-15 13:57:37,266 INFO [GroupCoordinator 1]: Stabilized group > debugconsumerlag generation 1 (__consumer_offsets-31) > (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance] > 2021-03-15 13:57:37,784 INFO [GroupCoordinator 1]: Assignment received > from leader for group debugconsumerlag for generation 1 > (kafka.coordinator.group.GroupCoordinator) > [data-plane-kafka-request-handler-3] > 2021-03-15 14:07:43,822 INFO [GroupCoordinator 1]: Preparing to rebalance > group debugconsumerlag in state PreparingRebalance with old generation 1 > (__consumer_offsets-31) (reason: Adding new member > consumer-debugconsumerlag-1-e2e57bf6-6cbc-4dba-81d4-d7e58219c23f with group > instance id None) (kafka.coordinator.group.GroupCoordinator) > [data-plane-kafka-request-handler-1] > 2021-03-15 14:07:46,530 INFO [GroupCoordinator 1]: Stabilized group > debugconsumerlag generation 2 (__consumer_offsets-31) > (kafka.coordinator.group.GroupCoordinator) > [data-plane-kafka-request-handler-1] > 2021-03-15 14:07:46,675 INFO [GroupCoordinator 1]: Assignment received > from leader for group debugconsumerlag for generation 2 > (kafka.coordinator.group.GroupCoordinator) > [data-plane-kafka-request-handler-3] > > Kind regards, >
Re: [ANNOUNCE] New committer: Tom Bentley
Congratulations, Tom! -Bill On Mon, Mar 15, 2021 at 2:08 PM Bruno Cadonna wrote: > Congrats, Tom! > > Best, > Bruno > > On 15.03.21 18:59, Mickael Maison wrote: > > Hi all, > > > > The PMC for Apache Kafka has invited Tom Bentley as a committer, and > > we are excited to announce that he accepted! > > > > Tom first contributed to Apache Kafka in June 2017 and has been > > actively contributing since February 2020. > > He has accumulated 52 commits and worked on a number of KIPs. Here are > > some of the most significant ones: > > KIP-183: Change PreferredReplicaLeaderElectionCommand to use > AdminClient > > KIP-195: AdminClient.createPartitions > > KIP-585: Filter and Conditional SMTs > > KIP-621: Deprecate and replace DescribeLogDirsResult.all() and > .values() > > KIP-707: The future of KafkaFuture (still in discussion) > > > > In addition, he is very active on the mailing list and has helped > > review many KIPs. > > > > Congratulations Tom and thanks for all the contributions! > > >
Re: [ANNOUNCE] New committer: Tom Bentley
Congrats, Tom! Best, Bruno On 15.03.21 18:59, Mickael Maison wrote: Hi all, The PMC for Apache Kafka has invited Tom Bentley as a committer, and we are excited to announce that he accepted! Tom first contributed to Apache Kafka in June 2017 and has been actively contributing since February 2020. He has accumulated 52 commits and worked on a number of KIPs. Here are some of the most significant ones: KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient KIP-195: AdminClient.createPartitions KIP-585: Filter and Conditional SMTs KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values() KIP-707: The future of KafkaFuture (still in discussion) In addition, he is very active on the mailing list and has helped review many KIPs. Congratulations Tom and thanks for all the contributions!
Re: [ANNOUNCE] New committer: Tom Bentley
Congratulations Tom! -- Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff On Mon, 15 Mar 2021 at 18:00, Mickael Maison wrote: > Hi all, > > The PMC for Apache Kafka has invited Tom Bentley as a committer, and > we are excited to announce that he accepted! > > Tom first contributed to Apache Kafka in June 2017 and has been > actively contributing since February 2020. > He has accumulated 52 commits and worked on a number of KIPs. Here are > some of the most significant ones: >KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient >KIP-195: AdminClient.createPartitions >KIP-585: Filter and Conditional SMTs >KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values() >KIP-707: The future of KafkaFuture (still in discussion) > > In addition, he is very active on the mailing list and has helped > review many KIPs. > > Congratulations Tom and thanks for all the contributions! >
[ANNOUNCE] New committer: Tom Bentley
Hi all, The PMC for Apache Kafka has invited Tom Bentley as a committer, and we are excited to announce that he accepted! Tom first contributed to Apache Kafka in June 2017 and has been actively contributing since February 2020. He has accumulated 52 commits and worked on a number of KIPs. Here are some of the most significant ones: KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient KIP-195: AdminClient.createPartitions KIP-585: Filter and Conditional SMTs KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values() KIP-707: The future of KafkaFuture (still in discussion) In addition, he is very active on the mailing list and has helped review many KIPs. Congratulations Tom and thanks for all the contributions!
Re: Error upgrading KafkaStreams
Hi Bruno Yes, cleaning up before upgrading is probably what I'm gonna do. I was just trying to understand what's going on, as this shouldn't be required. Thanks for your help Murilo On Mon, 15 Mar 2021 at 11:16, Bruno Cadonna wrote: > Hi Murilo, > > OK, now I see why you do not get an error in the second case in your > small environment where you cleaned up before upgrading. You would > restore from the earliest offset anyway and that is defined by the > earliest offset at the broker and that always exists. Hence, no out of > range exception is thrown. > > I am wondering why you get a out of range exception after upgrading > without clean up, though. > > A solution would be to clean up before upgrading in your large > environment. I do not know if this is a viable solution for you. > > Best, > Bruno > > On 15.03.21 16:01, Murilo Tavares wrote: > > Hi Bruno > > We have an environment variable that, when set, will call > > KafkaStreams.cleanup() and sleep. > > The changelog topic is an internal KafkaStreams topic, for which I'm not > > changing any policies. > > It should be some default policy for a KTable in my understanding. > > Thanks > > Murilo > > > > > > > > On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> A couple of questions: > >> > >> 1. What do you mean exactly with clean up? > >> 2. Do you have acleanup policy specified on the changelog topics? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 15:03, Murilo Tavares wrote: > >>> Hi Bruno > >>> No, I haven't tested resetting the application before upgrading on my > >> large > >>> environment. But I was able to reproduce it in my dev environment, > which > >> is > >>> way smaller. > >>> This is what I did: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - upgrade to 2.7; Same errors, but it caught up after a while; > >>> > >>> Then I tried these steps: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - Clean up and upgrade to 2.7. No error this time. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna > > >>> wrote: > >>> > Hi Murilo, > > Did you retry to upgrade again after you reset the application? Did it > work? > > Best, > Bruno > > On 15.03.21 14:26, Murilo Tavares wrote: > > Hi Bruno > > Thanks for your response. > > No, I did not reset the application prior to upgrading. That was > simply > > upgrading KafkaStreams from 2.4 to 2.7. > > > > I was able to reproduce it on a smaller environment, and it does > indeed > > recover. > > In a large environment, though, it keeps like that for hours. In this > same > > large environment, I had to downgrade the application, and when doing > that > > I did reset the application, which just took a few minutes. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > >>> > > wrote: > > > >> Hi Murilo, > >> > >> No, you do not need any special procedure to upgrade from 2.4 to > 2.7. > >> > >> What you see in the logs is not an error but a warning. It should > not > >> block you on startup forever. The warning says that the local states > >> of > >> task 7_17 are corrupted because the offset you want to fetch of the > >> state changelog topic partition > >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > >> larger > >> or smaller than the offsets that exist on the brokers for that > >> partition. If Streams runs into such an exception it will recreate > the > >> state from scratch which might take a while depending on the size of > >> the > >> state. > >> > >> The root cause of this warning is not clear from the information you > >> gave. Did you maybe reset the application but not wipe out the local > >> state stores? > >> > >> Best, > >> Bruno > >> > >> On 12.03.21 19:11, Murilo Tavares wrote: > >>> Hi > >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams > >> app > on > >>> 2.4.0. > >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > >> instances > >>> stuck on startup. > >>> In my understanding, I don't need any special procedure to upgraded > from > >>> KStreams 2.4.0 to 2.7.0, right? > >>> > >>> The following error stands out for me: > >>> > >>> 2021-03-12 16:23:52.005 [...] WARN > >>> org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >>> [...] Detected the states of tasks > >>> > >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted. Will close the task as dirty and re-create and bootstrap > from > >>> scratch. > >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>> changelogs > >>> > >> {7_17=[my-as
Re: Error upgrading KafkaStreams
Hi Murilo, OK, now I see why you do not get an error in the second case in your small environment where you cleaned up before upgrading. You would restore from the earliest offset anyway and that is defined by the earliest offset at the broker and that always exists. Hence, no out of range exception is thrown. I am wondering why you get a out of range exception after upgrading without clean up, though. A solution would be to clean up before upgrading in your large environment. I do not know if this is a viable solution for you. Best, Bruno On 15.03.21 16:01, Murilo Tavares wrote: Hi Bruno We have an environment variable that, when set, will call KafkaStreams.cleanup() and sleep. The changelog topic is an internal KafkaStreams topic, for which I'm not changing any policies. It should be some default policy for a KTable in my understanding. Thanks Murilo On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna wrote: Hi Murilo, A couple of questions: 1. What do you mean exactly with clean up? 2. Do you have acleanup policy specified on the changelog topics? Best, Bruno On 15.03.21 15:03, Murilo Tavares wrote: Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
recover messages marked for deletion
Hi, a newbie error but I can't find how to fix this, I updated the retention.ms topic config with a value shorted than what I wanted, and marked more messages than what I would for deletion. The files are still on the topic directory, but I can't find how can I tell to kafka to move the "first valid offset" backwards and recover those messages. Any suggestions? Thanks in advance, Regards Jokin
Re: Redis as state store
Hi Alex, I guess wiping out the state directory is easier code-wise, faster, and/or at the time of development the developers did not design for remote state stores. But I do actually not know the exact reason. Off the top of my head, I do not know how to solve this for remote state stores. Using the uncaught exception handler is not good, because a computing node could fail without giving the JVM the opportunity to throw an exception. In your tests, try to increase the commit interval to a high value and see if you get inconsistencies. You should get an inconsistency if the state store maintains counts for keys and after the last commit before the failure, the Streams app puts an event with a new key K with value 1 into the state store. After failover, Streams would put the same event with key K again into the state store. If the state store deleted all of its data, Streams would put again value 1, but if the state store did not delete all data, Streams would put value 2 which is wrong because it would count the same event twice. Best, Bruno On 15.03.21 15:20, Alex Craig wrote: Bruno, Thanks for the info! that makes sense. Of course now I have more questions. :) Do you know why this is being done outside of the state store API? I assume there are reasons why a "deleteAll()" type of function wouldn't work, thereby allowing a state store to purge itself? And maybe more importantly, is there a way to achieve a similar behavior with a 3rd party store? I'm not sure if hooking into the uncaught exception handler might be a good way to purge/drop a state store in the event of a fatal error? I did setup a MongoDB state store recently as part of a POC and was testing it with EOS enabled. (forcing crashes to occur and checking that the result of my aggregation was still accurate) I was unable to cause inconsistent data in the mongo store (which is good!), though of course I may just have been getting lucky. Thanks again for your help, Alex On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole wrote: Bruno, i tried to explain this in 'kafka user's language through above mentioned scenario, hope i put it properly -:) and correct me if i am wrong On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole wrote: This is what I understand could be the issue with external state store: kafka stream application consumes source topic, does processing, stores state to kafka state store (this is backed by topic) and before producing event on destination topic, the application fails with some issue. In this case, the transaction has failed, so kafka guarantees either all or none, means offset written to source topic, state written to state store topic, output produced on destination topic... all of these happen or none of these and in this failure scenario it is none of these. Assume you have redis state store, and you updated the state into redis and stream application failed. Now, you have source topic and destination topic consistent i.e. offset is not committed to source topic and output not produced on destination topic, but you redis state store is inconsistent with that since it is external state store and kafka can't guarantee rollback ot state written there On Mon, Mar 15, 2021 at 6:30 PM Alex Craig wrote: " Another issue with 3rd party state stores could be violation of exactly-once guarantee provided by kafka streams in the event of a failure of streams application instance" I've heard this before but would love to know more about how a custom state store would be at any greater risk than RocksDB as far as exactly-once guarantees are concerned. They all implement the same interface, so as long as you're correctly implementing get(), put(), delete(), flush(), etc, you should be fine right? In other words, I don't think there is any special "exactly once magic" that is baked into the RocksDB store code. I could be wrong though so I'd love to hear people's thoughts, thanks, Alex C On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan wrote: Thanks for the responses. In the worst case, I might have to keep both rocksdb for local store and keep an external store like Redis. -mohan On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: Another issue with 3rd party state stores could be violation of exactly-once guarantee provided by kafka streams in the event of a failure of streams application instance. Kafka provides exactly once guarantee for consumer -> process -> produce through transactions and with the use of state store like redis, this guarantee is weaker On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang wrote: > Hello Mohan, > > I think what you had in mind works with Redis, since it is a remote state > store engine, it does not have the co-partitioning requirements as local > state stores. > > One thing you'd need to tune KS though is that with remote stores, the > processing latency may
Re: Error upgrading KafkaStreams
Hi Bruno We have an environment variable that, when set, will call KafkaStreams.cleanup() and sleep. The changelog topic is an internal KafkaStreams topic, for which I'm not changing any policies. It should be some default policy for a KTable in my understanding. Thanks Murilo On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna wrote: > Hi Murilo, > > A couple of questions: > > 1. What do you mean exactly with clean up? > 2. Do you have acleanup policy specified on the changelog topics? > > Best, > Bruno > > On 15.03.21 15:03, Murilo Tavares wrote: > > Hi Bruno > > No, I haven't tested resetting the application before upgrading on my > large > > environment. But I was able to reproduce it in my dev environment, which > is > > way smaller. > > This is what I did: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - upgrade to 2.7; Same errors, but it caught up after a while; > > > > Then I tried these steps: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - Clean up and upgrade to 2.7. No error this time. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> Did you retry to upgrade again after you reset the application? Did it > >> work? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 14:26, Murilo Tavares wrote: > >>> Hi Bruno > >>> Thanks for your response. > >>> No, I did not reset the application prior to upgrading. That was simply > >>> upgrading KafkaStreams from 2.4 to 2.7. > >>> > >>> I was able to reproduce it on a smaller environment, and it does indeed > >>> recover. > >>> In a large environment, though, it keeps like that for hours. In this > >> same > >>> large environment, I had to downgrade the application, and when doing > >> that > >>> I did reset the application, which just took a few minutes. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > > >>> wrote: > >>> > Hi Murilo, > > No, you do not need any special procedure to upgrade from 2.4 to 2.7. > > What you see in the logs is not an error but a warning. It should not > block you on startup forever. The warning says that the local states > of > task 7_17 are corrupted because the offset you want to fetch of the > state changelog topic partition > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > larger > or smaller than the offsets that exist on the brokers for that > partition. If Streams runs into such an exception it will recreate the > state from scratch which might take a while depending on the size of > the > state. > > The root cause of this warning is not clear from the information you > gave. Did you maybe reset the application but not wipe out the local > state stores? > > Best, > Bruno > > On 12.03.21 19:11, Murilo Tavares wrote: > > Hi > > I have Kafka brokers running on version 2.4.1, with a KafkaStreams > app > >> on > > 2.4.0. > > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > instances > > stuck on startup. > > In my understanding, I don't need any special procedure to upgraded > >> from > > KStreams 2.4.0 to 2.7.0, right? > > > > The following error stands out for me: > > > > 2021-03-12 16:23:52.005 [...] WARN > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [...] Detected the states of tasks > > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted. Will close the task as dirty and re-create and bootstrap > >> from > > scratch. > > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > > changelogs > > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted and hence needs to be re-initialized > > at > > > > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > [app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > > [app.jar:?] > > Caused by: > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > > euw1-az1)], epoch=27}} is out of range for partition > > my-assem
Rebalancing and scaling of consumers on kubernetes, instanteous scale to x consumer replicas ==> x rebalancing?
Hi all, I have a kafka consumer pod running on kubernetes, I executed the command kubectl scale consumerName --replicas=2, and as shown in the logs below two seperate rebalancing processes were trigerred, so if the number of consumer replicas scaled = 100, one hundred seperate rebalancing are going to be trigerred. is that accurate? am I missing something? any workaroud to trigger a single rebalancing regardless of the number of replicas in the scale command. group coordinator logs = 2021-03-15 13:57:34,230 INFO [GroupCoordinator 1]: Preparing to rebalance group debugconsumerlag in state PreparingRebalance with old generation 0 (__consumer_offsets-31) (reason: Adding new member consumer-debugconsumerlag-1-1a577d6c-7389-4217-883f-89535032ae02 with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-5] 2021-03-15 13:57:37,266 INFO [GroupCoordinator 1]: Stabilized group debugconsumerlag generation 1 (__consumer_offsets-31) (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance] 2021-03-15 13:57:37,784 INFO [GroupCoordinator 1]: Assignment received from leader for group debugconsumerlag for generation 1 (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3] 2021-03-15 14:07:43,822 INFO [GroupCoordinator 1]: Preparing to rebalance group debugconsumerlag in state PreparingRebalance with old generation 1 (__consumer_offsets-31) (reason: Adding new member consumer-debugconsumerlag-1-e2e57bf6-6cbc-4dba-81d4-d7e58219c23f with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-1] 2021-03-15 14:07:46,530 INFO [GroupCoordinator 1]: Stabilized group debugconsumerlag generation 2 (__consumer_offsets-31) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-1] 2021-03-15 14:07:46,675 INFO [GroupCoordinator 1]: Assignment received from leader for group debugconsumerlag for generation 2 (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3] Kind regards,
Re: Error upgrading KafkaStreams
Hi Murilo, A couple of questions: 1. What do you mean exactly with clean up? 2. Do you have acleanup policy specified on the changelog topics? Best, Bruno On 15.03.21 15:03, Murilo Tavares wrote: Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo
Re: Redis as state store
Bruno, Thanks for the info! that makes sense. Of course now I have more questions. :) Do you know why this is being done outside of the state store API? I assume there are reasons why a "deleteAll()" type of function wouldn't work, thereby allowing a state store to purge itself? And maybe more importantly, is there a way to achieve a similar behavior with a 3rd party store? I'm not sure if hooking into the uncaught exception handler might be a good way to purge/drop a state store in the event of a fatal error? I did setup a MongoDB state store recently as part of a POC and was testing it with EOS enabled. (forcing crashes to occur and checking that the result of my aggregation was still accurate) I was unable to cause inconsistent data in the mongo store (which is good!), though of course I may just have been getting lucky. Thanks again for your help, Alex On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole wrote: > Bruno, > > i tried to explain this in 'kafka user's language through above mentioned > scenario, hope i put it properly -:) and correct me if i am wrong > > On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole > wrote: > > > This is what I understand could be the issue with external state store: > > > > kafka stream application consumes source topic, does processing, stores > > state to kafka state store (this is backed by topic) and before producing > > event on destination topic, the application fails with some issue. In > > this case, the transaction has failed, so kafka guarantees either all or > > none, means offset written to source topic, state written to state store > > topic, output produced on destination topic... all of these happen or > none > > of these and in this failure scenario it is none of these. > > > > Assume you have redis state store, and you updated the state into redis > > and stream application failed. Now, you have source topic and destination > > topic consistent i.e. offset is not committed to source topic and output > > not produced on destination topic, but you redis state store is > > inconsistent with that since it is external state store and kafka can't > > guarantee rollback ot state written there > > > > On Mon, Mar 15, 2021 at 6:30 PM Alex Craig > wrote: > > > >> " Another issue with 3rd party state stores could be violation of > >> exactly-once guarantee provided by kafka streams in the event of a > failure > >> of streams application instance" > >> > >> I've heard this before but would love to know more about how a custom > >> state > >> store would be at any greater risk than RocksDB as far as exactly-once > >> guarantees are concerned. They all implement the same interface, so as > >> long as you're correctly implementing get(), put(), delete(), flush(), > >> etc, > >> you should be fine right? In other words, I don't think there is any > >> special "exactly once magic" that is baked into the RocksDB store > code. I > >> could be wrong though so I'd love to hear people's thoughts, thanks, > >> > >> Alex C > >> > >> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan > >> wrote: > >> > >> > Thanks for the responses. In the worst case, I might have to keep both > >> > rocksdb for local store and keep an external store like Redis. > >> > > >> > -mohan > >> > > >> > > >> > On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: > >> > > >> > Another issue with 3rd party state stores could be violation of > >> > exactly-once guarantee provided by kafka streams in the event of a > >> > failure > >> > of streams application instance. > >> > Kafka provides exactly once guarantee for consumer -> process -> > >> > produce > >> > through transactions and with the use of state store like redis, > >> this > >> > guarantee is weaker > >> > > >> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang > > >> > wrote: > >> > > >> > > Hello Mohan, > >> > > > >> > > I think what you had in mind works with Redis, since it is a > >> remote > >> > state > >> > > store engine, it does not have the co-partitioning requirements > as > >> > local > >> > > state stores. > >> > > > >> > > One thing you'd need to tune KS though is that with remote > stores, > >> > the > >> > > processing latency may be larger, and since Kafka Streams > process > >> all > >> > > records of a single partition in order, synchronously, you may > >> need > >> > to tune > >> > > the poll interval configs etc to make sure KS would stay in the > >> > consumer > >> > > group and not trigger unnecessary rebalances. > >> > > > >> > > Guozhang > >> > > > >> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < > >> > mpart...@hpe.com> > >> > > wrote: > >> > > > >> > > > Hi, > >> > > > > >> > > > I have a use case where messages come in with some key gets > >> > assigned some > >> > > > partition and the state gets created. Later, key changes (but > >> still > >> > > > contains the old key in the mess
Re: Error upgrading KafkaStreams
Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: > Hi Murilo, > > Did you retry to upgrade again after you reset the application? Did it > work? > > Best, > Bruno > > On 15.03.21 14:26, Murilo Tavares wrote: > > Hi Bruno > > Thanks for your response. > > No, I did not reset the application prior to upgrading. That was simply > > upgrading KafkaStreams from 2.4 to 2.7. > > > > I was able to reproduce it on a smaller environment, and it does indeed > > recover. > > In a large environment, though, it keeps like that for hours. In this > same > > large environment, I had to downgrade the application, and when doing > that > > I did reset the application, which just took a few minutes. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> No, you do not need any special procedure to upgrade from 2.4 to 2.7. > >> > >> What you see in the logs is not an error but a warning. It should not > >> block you on startup forever. The warning says that the local states of > >> task 7_17 are corrupted because the offset you want to fetch of the > >> state changelog topic partition > >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger > >> or smaller than the offsets that exist on the brokers for that > >> partition. If Streams runs into such an exception it will recreate the > >> state from scratch which might take a while depending on the size of the > >> state. > >> > >> The root cause of this warning is not clear from the information you > >> gave. Did you maybe reset the application but not wipe out the local > >> state stores? > >> > >> Best, > >> Bruno > >> > >> On 12.03.21 19:11, Murilo Tavares wrote: > >>> Hi > >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams app > on > >>> 2.4.0. > >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances > >>> stuck on startup. > >>> In my understanding, I don't need any special procedure to upgraded > from > >>> KStreams 2.4.0 to 2.7.0, right? > >>> > >>> The following error stands out for me: > >>> > >>> 2021-03-12 16:23:52.005 [...] WARN > >>>org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >>> [...] Detected the states of tasks > >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted. Will close the task as dirty and re-create and bootstrap > from > >>> scratch. > >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>> changelogs > >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted and hence needs to be re-initialized > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > >>> [app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > >>> [app.jar:?] > >>> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > >>> Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > >>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > >>> euw1-az1)], epoch=27}} is out of range for partition > >>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StoreC
Re: Redis as state store
Bruno, i tried to explain this in 'kafka user's language through above mentioned scenario, hope i put it properly -:) and correct me if i am wrong On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole wrote: > This is what I understand could be the issue with external state store: > > kafka stream application consumes source topic, does processing, stores > state to kafka state store (this is backed by topic) and before producing > event on destination topic, the application fails with some issue. In > this case, the transaction has failed, so kafka guarantees either all or > none, means offset written to source topic, state written to state store > topic, output produced on destination topic... all of these happen or none > of these and in this failure scenario it is none of these. > > Assume you have redis state store, and you updated the state into redis > and stream application failed. Now, you have source topic and destination > topic consistent i.e. offset is not committed to source topic and output > not produced on destination topic, but you redis state store is > inconsistent with that since it is external state store and kafka can't > guarantee rollback ot state written there > > On Mon, Mar 15, 2021 at 6:30 PM Alex Craig wrote: > >> " Another issue with 3rd party state stores could be violation of >> exactly-once guarantee provided by kafka streams in the event of a failure >> of streams application instance" >> >> I've heard this before but would love to know more about how a custom >> state >> store would be at any greater risk than RocksDB as far as exactly-once >> guarantees are concerned. They all implement the same interface, so as >> long as you're correctly implementing get(), put(), delete(), flush(), >> etc, >> you should be fine right? In other words, I don't think there is any >> special "exactly once magic" that is baked into the RocksDB store code. I >> could be wrong though so I'd love to hear people's thoughts, thanks, >> >> Alex C >> >> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan >> wrote: >> >> > Thanks for the responses. In the worst case, I might have to keep both >> > rocksdb for local store and keep an external store like Redis. >> > >> > -mohan >> > >> > >> > On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: >> > >> > Another issue with 3rd party state stores could be violation of >> > exactly-once guarantee provided by kafka streams in the event of a >> > failure >> > of streams application instance. >> > Kafka provides exactly once guarantee for consumer -> process -> >> > produce >> > through transactions and with the use of state store like redis, >> this >> > guarantee is weaker >> > >> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang >> > wrote: >> > >> > > Hello Mohan, >> > > >> > > I think what you had in mind works with Redis, since it is a >> remote >> > state >> > > store engine, it does not have the co-partitioning requirements as >> > local >> > > state stores. >> > > >> > > One thing you'd need to tune KS though is that with remote stores, >> > the >> > > processing latency may be larger, and since Kafka Streams process >> all >> > > records of a single partition in order, synchronously, you may >> need >> > to tune >> > > the poll interval configs etc to make sure KS would stay in the >> > consumer >> > > group and not trigger unnecessary rebalances. >> > > >> > > Guozhang >> > > >> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < >> > mpart...@hpe.com> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > I have a use case where messages come in with some key gets >> > assigned some >> > > > partition and the state gets created. Later, key changes (but >> still >> > > > contains the old key in the message) and gets sent to a >> different >> > > > partition. I want to be able to grab the old state using the old >> > key >> > > before >> > > > creating the new state on this instance. Redis as a state store >> > makes it >> > > > easy to implement this where I can simply do a lookup before >> > creating the >> > > > state. I see an implementation here : >> > > > >> > > >> > >> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks >> > > > >> > > > Has anyone tried this ? Any caveats. >> > > > >> > > > Thanks >> > > > Mohan >> > > > >> > > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >> > >> >
Re: Redis as state store
This is what I understand could be the issue with external state store: kafka stream application consumes source topic, does processing, stores state to kafka state store (this is backed by topic) and before producing event on destination topic, the application fails with some issue. In this case, the transaction has failed, so kafka guarantees either all or none, means offset written to source topic, state written to state store topic, output produced on destination topic... all of these happen or none of these and in this failure scenario it is none of these. Assume you have redis state store, and you updated the state into redis and stream application failed. Now, you have source topic and destination topic consistent i.e. offset is not committed to source topic and output not produced on destination topic, but you redis state store is inconsistent with that since it is external state store and kafka can't guarantee rollback ot state written there On Mon, Mar 15, 2021 at 6:30 PM Alex Craig wrote: > " Another issue with 3rd party state stores could be violation of > exactly-once guarantee provided by kafka streams in the event of a failure > of streams application instance" > > I've heard this before but would love to know more about how a custom state > store would be at any greater risk than RocksDB as far as exactly-once > guarantees are concerned. They all implement the same interface, so as > long as you're correctly implementing get(), put(), delete(), flush(), etc, > you should be fine right? In other words, I don't think there is any > special "exactly once magic" that is baked into the RocksDB store code. I > could be wrong though so I'd love to hear people's thoughts, thanks, > > Alex C > > On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan > wrote: > > > Thanks for the responses. In the worst case, I might have to keep both > > rocksdb for local store and keep an external store like Redis. > > > > -mohan > > > > > > On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: > > > > Another issue with 3rd party state stores could be violation of > > exactly-once guarantee provided by kafka streams in the event of a > > failure > > of streams application instance. > > Kafka provides exactly once guarantee for consumer -> process -> > > produce > > through transactions and with the use of state store like redis, this > > guarantee is weaker > > > > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang > > wrote: > > > > > Hello Mohan, > > > > > > I think what you had in mind works with Redis, since it is a remote > > state > > > store engine, it does not have the co-partitioning requirements as > > local > > > state stores. > > > > > > One thing you'd need to tune KS though is that with remote stores, > > the > > > processing latency may be larger, and since Kafka Streams process > all > > > records of a single partition in order, synchronously, you may need > > to tune > > > the poll interval configs etc to make sure KS would stay in the > > consumer > > > group and not trigger unnecessary rebalances. > > > > > > Guozhang > > > > > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < > > mpart...@hpe.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I have a use case where messages come in with some key gets > > assigned some > > > > partition and the state gets created. Later, key changes (but > still > > > > contains the old key in the message) and gets sent to a different > > > > partition. I want to be able to grab the old state using the old > > key > > > before > > > > creating the new state on this instance. Redis as a state store > > makes it > > > > easy to implement this where I can simply do a lookup before > > creating the > > > > state. I see an implementation here : > > > > > > > > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > > > > > Has anyone tried this ? Any caveats. > > > > > > > > Thanks > > > > Mohan > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > >
Re: Error upgrading KafkaStreams
Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo
Re: Redis as state store
Hi Alex, You are right! There is no "exactly once magic" backed into the RocksDB store code. The point is local vs remote. When a Kafka Streams client closes dirty under EOS, the state (i.e., the content of the state store) needs to be wiped out and to be re-created from scratch from the changelog topic on the brokers. To wipe out the state the state directory is deleted. For a remote state store, the wiping out of the state directory would not delete the contents of the remote state store before Kafka Streams re-creates the content from scratch. Wiping out the state directory, happens outside of the API implemented by a state store. Best, Bruno On 15.03.21 13:59, Alex Craig wrote: " Another issue with 3rd party state stores could be violation of exactly-once guarantee provided by kafka streams in the event of a failure of streams application instance" I've heard this before but would love to know more about how a custom state store would be at any greater risk than RocksDB as far as exactly-once guarantees are concerned. They all implement the same interface, so as long as you're correctly implementing get(), put(), delete(), flush(), etc, you should be fine right? In other words, I don't think there is any special "exactly once magic" that is baked into the RocksDB store code. I could be wrong though so I'd love to hear people's thoughts, thanks, Alex C On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan wrote: Thanks for the responses. In the worst case, I might have to keep both rocksdb for local store and keep an external store like Redis. -mohan On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: Another issue with 3rd party state stores could be violation of exactly-once guarantee provided by kafka streams in the event of a failure of streams application instance. Kafka provides exactly once guarantee for consumer -> process -> produce through transactions and with the use of state store like redis, this guarantee is weaker On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang wrote: > Hello Mohan, > > I think what you had in mind works with Redis, since it is a remote state > store engine, it does not have the co-partitioning requirements as local > state stores. > > One thing you'd need to tune KS though is that with remote stores, the > processing latency may be larger, and since Kafka Streams process all > records of a single partition in order, synchronously, you may need to tune > the poll interval configs etc to make sure KS would stay in the consumer > group and not trigger unnecessary rebalances. > > Guozhang > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < mpart...@hpe.com> > wrote: > > > Hi, > > > > I have a use case where messages come in with some key gets assigned some > > partition and the state gets created. Later, key changes (but still > > contains the old key in the message) and gets sent to a different > > partition. I want to be able to grab the old state using the old key > before > > creating the new state on this instance. Redis as a state store makes it > > easy to implement this where I can simply do a lookup before creating the > > state. I see an implementation here : > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > Has anyone tried this ? Any caveats. > > > > Thanks > > Mohan > > > > > > -- > -- Guozhang >
Re: Error upgrading KafkaStreams
Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: > Hi Murilo, > > No, you do not need any special procedure to upgrade from 2.4 to 2.7. > > What you see in the logs is not an error but a warning. It should not > block you on startup forever. The warning says that the local states of > task 7_17 are corrupted because the offset you want to fetch of the > state changelog topic partition > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger > or smaller than the offsets that exist on the brokers for that > partition. If Streams runs into such an exception it will recreate the > state from scratch which might take a while depending on the size of the > state. > > The root cause of this warning is not clear from the information you > gave. Did you maybe reset the application but not wipe out the local > state stores? > > Best, > Bruno > > On 12.03.21 19:11, Murilo Tavares wrote: > > Hi > > I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on > > 2.4.0. > > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances > > stuck on startup. > > In my understanding, I don't need any special procedure to upgraded from > > KStreams 2.4.0 to 2.7.0, right? > > > > The following error stands out for me: > > > > 2021-03-12 16:23:52.005 [...] WARN > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [...] Detected the states of tasks > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted. Will close the task as dirty and re-create and bootstrap from > > scratch. > > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > > changelogs > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted and hence needs to be re-initialized > > at > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > [app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > > [app.jar:?] > > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > > euw1-az1)], epoch=27}} is out of range for partition > > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > > ~[app.jar:?] > > ... 4 more > > > > Any suggestions on how to upgrade? > > Thanks > > Murilo > > >
Re: Redis as state store
" Another issue with 3rd party state stores could be violation of exactly-once guarantee provided by kafka streams in the event of a failure of streams application instance" I've heard this before but would love to know more about how a custom state store would be at any greater risk than RocksDB as far as exactly-once guarantees are concerned. They all implement the same interface, so as long as you're correctly implementing get(), put(), delete(), flush(), etc, you should be fine right? In other words, I don't think there is any special "exactly once magic" that is baked into the RocksDB store code. I could be wrong though so I'd love to hear people's thoughts, thanks, Alex C On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan wrote: > Thanks for the responses. In the worst case, I might have to keep both > rocksdb for local store and keep an external store like Redis. > > -mohan > > > On 3/13/21, 8:53 PM, "Pushkar Deole" wrote: > > Another issue with 3rd party state stores could be violation of > exactly-once guarantee provided by kafka streams in the event of a > failure > of streams application instance. > Kafka provides exactly once guarantee for consumer -> process -> > produce > through transactions and with the use of state store like redis, this > guarantee is weaker > > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang > wrote: > > > Hello Mohan, > > > > I think what you had in mind works with Redis, since it is a remote > state > > store engine, it does not have the co-partitioning requirements as > local > > state stores. > > > > One thing you'd need to tune KS though is that with remote stores, > the > > processing latency may be larger, and since Kafka Streams process all > > records of a single partition in order, synchronously, you may need > to tune > > the poll interval configs etc to make sure KS would stay in the > consumer > > group and not trigger unnecessary rebalances. > > > > Guozhang > > > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < > mpart...@hpe.com> > > wrote: > > > > > Hi, > > > > > > I have a use case where messages come in with some key gets > assigned some > > > partition and the state gets created. Later, key changes (but still > > > contains the old key in the message) and gets sent to a different > > > partition. I want to be able to grab the old state using the old > key > > before > > > creating the new state on this instance. Redis as a state store > makes it > > > easy to implement this where I can simply do a lookup before > creating the > > > state. I see an implementation here : > > > > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > > > Has anyone tried this ? Any caveats. > > > > > > Thanks > > > Mohan > > > > > > > > > > -- > > -- Guozhang > > > > >
Schema registry for Kafka topic
Hi, We have an in-house cluster of Kafka brokers and ZooKeepers. Kafka version kafka_2.12-2.7.0 ZooKeeper version apache-zookeeper-3.6.2-bin The topic is published to Google BigQuery. We would like to use a schema registry as opposed to sending schema with payload for each message that adds to the volume of traffic. As I understand from the available literature, this schema registry acts as a a kind of API to decouple consumers from producers and provide a common interface for topic metadata. Currently I am looking at the open source community version for creating and maintaining this schema registry. We tried wepay provider from Confluent (which I assumed it seamlessly allow one to publish kafka topic (source Json) to Google BigQuery but still getting the error: org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct' Appreciate any advice. Thanks *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Error upgrading KafkaStreams
Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo
Re: topics Replicas not equals Isr in zookeeper
This issue has clear positioning, increase the partition, the colleague's code used in 0.10 AdminUtils, the replicas of the partitions of the existing distribution redistribution, and directly write into the znode, to the normal controller failed to deal with this logic, I run the kafka is 2.0.0 version, don't know whether the latest kafka versions will deal with this exception, or is it a optimization problem? wenbing shen 于2021年3月11日周四 下午11:07写道: > [image: image.png] > This is a screenshot of a problem occurred in our product at the customer > site. Does anyone know the reason? > > wenbing shen 于2021年3月11日周四 下午7:17写道: > >> A large number of Replicas of the topic are inconsistent with isr, an >> example of which is as follows: >> Topic: td_tip Partition: 0Leader: 1001Replicas: >> 1003,1005 Isr: 1001,1002 >> Topic: td_tip Partition: 1Leader: 1006Replicas: >> 1004,1007 Isr: 1006,1009 >> Topic: td_tip Partition: 2Leader: 1011Replicas: 1020,1022 >> Isr: 1012,1011 >> Topic: td_tip Partition: 3Leader: 1009Replicas: >> 1015,1016 Isr: 1009,1008 >> >