[Kafka Consumer] deserializer (documentation) mismatch?
Hi, I found a mismatch between the documentation in the org.apache.kafka.common.serialization.Deserializer and the implementation in KafkaConsumer. Deserializer documentation sais: *"serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception*" but in the KafkaConsumer, 'null' is never passed to the deserializer. >From 'parseRecord' in 'org.apache.kafka.clients.consumer.internals.Fetcher' : {{ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); }} I stumbled upon this discrepancy while trying to pass a valid object from the deserializer to the application when a 'delete' was received on a log-compacted topic. So basically the question I have here is the following: is the documentation in the Deserializer wrong, or is the implementation in the Fetcher wrong? To me it seems more plausible to have 'null' being processed by the deserializer, as to the Fetcher shortcutting on 'null' values ... Any thoughts? Greets, Bart
Re: [KafkaStreams 1.1.1] partition assignment broken?
Hi Bill, So we ended up applying the fix for KAFKA-7144 onto kafka 1.1.1 and now all works fine. Thanks for the insight. Greets, Bart On Tue, Oct 9, 2018 at 4:49 PM Bill Bejeck wrote: > Hi Bart, > > Sounds good. Let me know how it goes. > > -Bill > > On Tue, Oct 9, 2018 at 5:08 AM Bart Vercammen wrote: > > > Hi Bill, > > > > Thanks for the reply. > > We had a look at the patch for v and will try it out on Kafka > > 1.1.1 > > Currently a full upstep to 2.0.x is not yet an option. > > > > In the mean time I have some unit-tests that reproduce this problem, so > the > > backport to v1.1.1 can easily be verified. > > > > Greets, > > Bart > > > > On Tue, Oct 9, 2018 at 12:27 AM Bill Bejeck wrote: > > > > > Hi Bart, > > > > > > This is a known issue discovered in version 1.1 - > > > https://issues.apache.org/jira/browse/KAFKA-7144 > > > > > > This issue has been fixed in Kafka Streams 2.0, any chance you can > > upgrade > > > to 2.0? > > > > > > Thanks, > > > Bill > > > > > > On Mon, Oct 8, 2018 at 2:46 PM Bart Vercammen > wrote: > > > > > > > Thanks John, > > > > > > > > I'll see what I can do regarding the logs ... > > > > As a side not, our Kafka cluster is running version v1.1.1 in > v0.10.2.1 > > > log > > > > format configuration (due to another issue: KAFKA-6000) > > > > But, as said, I'll try to come up with some detailed logs, or a > > scenario > > > to > > > > reproduce this. > > > > > > > > Greets, > > > > Bart > > > > > > > > On Mon, Oct 8, 2018 at 8:37 PM John Roesler > wrote: > > > > > > > > > Hi Bart, > > > > > > > > > > I suspected it might not be feasible to just dump your production > > logs > > > > onto > > > > > the internet. > > > > > > > > > > A repro would be even better, but I bet it wouldn't show up when > you > > > try > > > > > and reproduce it. Good luck! > > > > > > > > > > If the repro doesn't turn out, maybe you could just extract the > > > > assignment > > > > > lines from your logs? > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen > > > wrote: > > > > > > > > > > > Hi John, > > > > > > > > > > > > Zipping up some logs from our running Kafka cluster is going to > be > > a > > > > bit > > > > > > difficult. > > > > > > What I can do is try to reproduce this off-line and capture the > > logs > > > > from > > > > > > there. > > > > > > > > > > > > We also had a look in the PartitionAssignor source code (for > 1.1.1) > > > and > > > > > > indeed this behaviour is a bit weird > > > > > > as from the source code I'd expect equally divided partitions. > > > > > > > > > > > > Anyway, hopefully I'll be able to reproduce this issue with some > > > simple > > > > > > unit-test like code. > > > > > > I'll post the results when I have more info. > > > > > > > > > > > > Greets, > > > > > > Bart > > > > > > > > > > > > On Mon, Oct 8, 2018 at 7:36 PM John Roesler > > > wrote: > > > > > > > > > > > > > Hi Bart, > > > > > > > > > > > > > > This sounds a bit surprising. Is there any chance you can zip > up > > > some > > > > > > logs > > > > > > > so we can see the assignment protocol on the nodes? > > > > > > > > > > > > > > Thanks, > > > > > > > -John > > > > > > > > > > > > > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen < > b...@cloutrix.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I recently moved some KafkaStreams applications from > v0.10.2.1 > > to > > > > > > v1.1.1 > > > > > > > > and now I notice a weird behaviour in the partition > assignment. > > > > > > > > When starting 4 instances of my Kafka Streams application (on > > > > > v1.1.1) I > > > > > > > see > > > > > > > > that 17 of the 20 partitions (of a source topic) are assigned > > to > > > 1 > > > > > > > instance > > > > > > > > of the application while the other 3 instances only get 1 > > > partition > > > > > > > > assigned. (previously (on v0.10.2.1) the all got 5 > partitions.) > > > > > > > > > > > > > > > > Is this expected behaviour, as I read that quite some > > > improvements > > > > > were > > > > > > > > done in the partition assignment strategy for Kafka Streams > > > > > > applications? > > > > > > > > If yes, how can I make it so that the partitions are equally > > > > devided > > > > > > > again > > > > > > > > across all running applications? It's a bit weird in my > > opinion > > > > as > > > > > > this > > > > > > > > makes scaling the application very hard. > > > > > > > > > > > > > > > > Also, when initially starting with 1 instance of the > > application, > > > > and > > > > > > > > gradually scaling up, the new instances only get 1 partition > > > > assigned > > > > > > ... > > > > > > > > > > > > > > > > All my Streams applications use default configuration (more > or > > > > less), > > > > > > > > running 1 stream-thread. > > > > > > > > > > > > > > > > Any suggestions / enlightenments on this? > > > > > > > > Greets, > > > > > > > > Bart > > > > > > > > >
Re: [KafkaStreams 1.1.1] partition assignment broken?
Hi Bill, Thanks for the reply. We had a look at the patch for KAFKA-7144 and will try it out on Kafka 1.1.1 Currently a full upstep to 2.0.x is not yet an option. In the mean time I have some unit-tests that reproduce this problem, so the backport to v1.1.1 can easily be verified. Greets, Bart On Tue, Oct 9, 2018 at 12:27 AM Bill Bejeck wrote: > Hi Bart, > > This is a known issue discovered in version 1.1 - > https://issues.apache.org/jira/browse/KAFKA-7144 > > This issue has been fixed in Kafka Streams 2.0, any chance you can upgrade > to 2.0? > > Thanks, > Bill > > On Mon, Oct 8, 2018 at 2:46 PM Bart Vercammen wrote: > > > Thanks John, > > > > I'll see what I can do regarding the logs ... > > As a side not, our Kafka cluster is running version v1.1.1 in v0.10.2.1 > log > > format configuration (due to another issue: KAFKA-6000) > > But, as said, I'll try to come up with some detailed logs, or a scenario > to > > reproduce this. > > > > Greets, > > Bart > > > > On Mon, Oct 8, 2018 at 8:37 PM John Roesler wrote: > > > > > Hi Bart, > > > > > > I suspected it might not be feasible to just dump your production logs > > onto > > > the internet. > > > > > > A repro would be even better, but I bet it wouldn't show up when you > try > > > and reproduce it. Good luck! > > > > > > If the repro doesn't turn out, maybe you could just extract the > > assignment > > > lines from your logs? > > > > > > Thanks, > > > -John > > > > > > On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen > wrote: > > > > > > > Hi John, > > > > > > > > Zipping up some logs from our running Kafka cluster is going to be a > > bit > > > > difficult. > > > > What I can do is try to reproduce this off-line and capture the logs > > from > > > > there. > > > > > > > > We also had a look in the PartitionAssignor source code (for 1.1.1) > and > > > > indeed this behaviour is a bit weird > > > > as from the source code I'd expect equally divided partitions. > > > > > > > > Anyway, hopefully I'll be able to reproduce this issue with some > simple > > > > unit-test like code. > > > > I'll post the results when I have more info. > > > > > > > > Greets, > > > > Bart > > > > > > > > On Mon, Oct 8, 2018 at 7:36 PM John Roesler > wrote: > > > > > > > > > Hi Bart, > > > > > > > > > > This sounds a bit surprising. Is there any chance you can zip up > some > > > > logs > > > > > so we can see the assignment protocol on the nodes? > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I recently moved some KafkaStreams applications from v0.10.2.1 to > > > > v1.1.1 > > > > > > and now I notice a weird behaviour in the partition assignment. > > > > > > When starting 4 instances of my Kafka Streams application (on > > > v1.1.1) I > > > > > see > > > > > > that 17 of the 20 partitions (of a source topic) are assigned to > 1 > > > > > instance > > > > > > of the application while the other 3 instances only get 1 > partition > > > > > > assigned. (previously (on v0.10.2.1) the all got 5 partitions.) > > > > > > > > > > > > Is this expected behaviour, as I read that quite some > improvements > > > were > > > > > > done in the partition assignment strategy for Kafka Streams > > > > applications? > > > > > > If yes, how can I make it so that the partitions are equally > > devided > > > > > again > > > > > > across all running applications? It's a bit weird in my opinion > > as > > > > this > > > > > > makes scaling the application very hard. > > > > > > > > > > > > Also, when initially starting with 1 instance of the application, > > and > > > > > > gradually scaling up, the new instances only get 1 partition > > assigned > > > > ... > > > > > > > > > > > > All my Streams applications use default configuration (more or > > less), > > > > > > running 1 stream-thread. > > > > > > > > > > > > Any suggestions / enlightenments on this? > > > > > > Greets, > > > > > > Bart > > > > > > > >
Re: [KafkaStreams 1.1.1] partition assignment broken?
Thanks John, I'll see what I can do regarding the logs ... As a side not, our Kafka cluster is running version v1.1.1 in v0.10.2.1 log format configuration (due to another issue: KAFKA-6000) But, as said, I'll try to come up with some detailed logs, or a scenario to reproduce this. Greets, Bart On Mon, Oct 8, 2018 at 8:37 PM John Roesler wrote: > Hi Bart, > > I suspected it might not be feasible to just dump your production logs onto > the internet. > > A repro would be even better, but I bet it wouldn't show up when you try > and reproduce it. Good luck! > > If the repro doesn't turn out, maybe you could just extract the assignment > lines from your logs? > > Thanks, > -John > > On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen wrote: > > > Hi John, > > > > Zipping up some logs from our running Kafka cluster is going to be a bit > > difficult. > > What I can do is try to reproduce this off-line and capture the logs from > > there. > > > > We also had a look in the PartitionAssignor source code (for 1.1.1) and > > indeed this behaviour is a bit weird > > as from the source code I'd expect equally divided partitions. > > > > Anyway, hopefully I'll be able to reproduce this issue with some simple > > unit-test like code. > > I'll post the results when I have more info. > > > > Greets, > > Bart > > > > On Mon, Oct 8, 2018 at 7:36 PM John Roesler wrote: > > > > > Hi Bart, > > > > > > This sounds a bit surprising. Is there any chance you can zip up some > > logs > > > so we can see the assignment protocol on the nodes? > > > > > > Thanks, > > > -John > > > > > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen > wrote: > > > > > > > Hi, > > > > > > > > I recently moved some KafkaStreams applications from v0.10.2.1 to > > v1.1.1 > > > > and now I notice a weird behaviour in the partition assignment. > > > > When starting 4 instances of my Kafka Streams application (on > v1.1.1) I > > > see > > > > that 17 of the 20 partitions (of a source topic) are assigned to 1 > > > instance > > > > of the application while the other 3 instances only get 1 partition > > > > assigned. (previously (on v0.10.2.1) the all got 5 partitions.) > > > > > > > > Is this expected behaviour, as I read that quite some improvements > were > > > > done in the partition assignment strategy for Kafka Streams > > applications? > > > > If yes, how can I make it so that the partitions are equally devided > > > again > > > > across all running applications? It's a bit weird in my opinion as > > this > > > > makes scaling the application very hard. > > > > > > > > Also, when initially starting with 1 instance of the application, and > > > > gradually scaling up, the new instances only get 1 partition assigned > > ... > > > > > > > > All my Streams applications use default configuration (more or less), > > > > running 1 stream-thread. > > > > > > > > Any suggestions / enlightenments on this? > > > > Greets, > > > > Bart > > > > > >
Re: [KafkaStreams 1.1.1] partition assignment broken?
Hi John, Zipping up some logs from our running Kafka cluster is going to be a bit difficult. What I can do is try to reproduce this off-line and capture the logs from there. We also had a look in the PartitionAssignor source code (for 1.1.1) and indeed this behaviour is a bit weird as from the source code I'd expect equally divided partitions. Anyway, hopefully I'll be able to reproduce this issue with some simple unit-test like code. I'll post the results when I have more info. Greets, Bart On Mon, Oct 8, 2018 at 7:36 PM John Roesler wrote: > Hi Bart, > > This sounds a bit surprising. Is there any chance you can zip up some logs > so we can see the assignment protocol on the nodes? > > Thanks, > -John > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen wrote: > > > Hi, > > > > I recently moved some KafkaStreams applications from v0.10.2.1 to v1.1.1 > > and now I notice a weird behaviour in the partition assignment. > > When starting 4 instances of my Kafka Streams application (on v1.1.1) I > see > > that 17 of the 20 partitions (of a source topic) are assigned to 1 > instance > > of the application while the other 3 instances only get 1 partition > > assigned. (previously (on v0.10.2.1) the all got 5 partitions.) > > > > Is this expected behaviour, as I read that quite some improvements were > > done in the partition assignment strategy for Kafka Streams applications? > > If yes, how can I make it so that the partitions are equally devided > again > > across all running applications? It's a bit weird in my opinion as this > > makes scaling the application very hard. > > > > Also, when initially starting with 1 instance of the application, and > > gradually scaling up, the new instances only get 1 partition assigned ... > > > > All my Streams applications use default configuration (more or less), > > running 1 stream-thread. > > > > Any suggestions / enlightenments on this? > > Greets, > > Bart > > > -- Mvg, Bart Vercammen clouTrix BVBA +32 486 69 17 68 i...@cloutrix.com
[KafkaStreams 1.1.1] partition assignment broken?
Hi, I recently moved some KafkaStreams applications from v0.10.2.1 to v1.1.1 and now I notice a weird behaviour in the partition assignment. When starting 4 instances of my Kafka Streams application (on v1.1.1) I see that 17 of the 20 partitions (of a source topic) are assigned to 1 instance of the application while the other 3 instances only get 1 partition assigned. (previously (on v0.10.2.1) the all got 5 partitions.) Is this expected behaviour, as I read that quite some improvements were done in the partition assignment strategy for Kafka Streams applications? If yes, how can I make it so that the partitions are equally devided again across all running applications? It's a bit weird in my opinion as this makes scaling the application very hard. Also, when initially starting with 1 instance of the application, and gradually scaling up, the new instances only get 1 partition assigned ... All my Streams applications use default configuration (more or less), running 1 stream-thread. Any suggestions / enlightenments on this? Greets, Bart
Re: [kafka streams] discuss: dynamically update subscription pattern
Hi Guozhang, In the end I opted for a native Kafka consumer/producer application instead of using Kafka streams for this. The overhead in creating new streams applications for each update of the metadata was a bit to cumbersome. But still, the issue remains that, although this works (thanks for the suggestion) I double the data on Kafka. In order to reduce the data I started moving some of the aggregation logic from my streams application to this new application that combines the input streams. So this leaves me with only little logic in the streams application to preserve the state in its kafka backed state-stores. So it already is happening: when I now implement state handling in my "combining application" I will end up creating an own stream-processing framework instead of levering on kafka streams ... this was not really the intention. So, in short, your suggestion is a nice work-around, but still leaves me with my initial remark that it would be useful to somehow be able to alter the subscriptions in a running streams application. Bart On Tue, Aug 15, 2017 at 1:45 PM, Bart Vercammen <b...@cloutrix.com> wrote: > HI Guozhang, > > Thanks for your swift feedback. > Using your "Pipe App" example might actually be a neat work-around. > I'll see if I can work out a simple prototype for this on our platform. > > The only downside of this is that I will double the message-load on the > platform (from source-topics to processing-topic) > But again, I'll try it out and see how far I get with this solution. > > Thanks again for sharing your insights, > Bart > > > On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> Hello Bart, >> >> Thanks for your detailed explanation. I saw your motivation now and it >> indeed validates itself as a single application that dynamically change >> subscriptions. >> >> As I mentioned Streams today do not have a native support for dynamically >> changing subscriptions. That being said, If you would not care about the >> offsets then there may be one sub-optimal workaround: you can have a >> two-stage pipeline to separate topic consumption with processing, where >> the >> first stage is a very simple "Pipe" app (e.g. >> https://github.com/apache/kafka/blob/trunk/streams/quickstar >> t/java/src/main/resources/archetype-resources/src/main/java/Pipe.java) >> which is very lightweight in resource consumption. Each app pipes from a >> topic to the processing topic which aggregates all the topics that you >> want >> to track in the second stage, i.e. the processing stage. >> >> Your processing app only reads from this aggregated source topic, while >> your piping apps pipe each input topic to the aggregate topic. You can >> read >> from the global "command" topic to 1) auto generate the code with the >> source topic swapped with the specified string, compile and execute the >> code, or 2) shutdown an existing program piping from a topic. This will >> admittedly introduce a duplicate topic containing the aggregated data, but >> operational-wise may still be simpler. >> >> >> Guozhang >> >> >> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> >> wrote: >> >> > Hi Guozhang, >> > >> > For the use-cases I have in mind, the offset of the source topics is >> > irrelevant to the state stored by the streams application. >> > So, when topic 'A' gets dropped and topic 'B' is added, I would prefer >> the >> > application to start reading from 'latest' but that is actually not >> *that* >> > important to me. >> > The main thing is that I'm able somehow to add a new kafka topic to the >> > source of the streams application at runtime, triggered by messages >> flowing >> > on another "metadata" kafka topic. >> > >> > So considering your example: >> > When 'B' is added, I would expect the streams application to start >> reading >> > from 'latest' (but not that important) >> > When 'A' is removed, the state from 'A' is still valid in the state >> stores, >> > but 'A' should not be tracked anymore. >> > When 'A' is added again, I would expect the streams application to start >> > reading from 'latest' (not the committed offset, but again not that >> > important for my use-case) >> > >> > But this being said, my main focus is on the ability to 'add' new kafka >> > topics to the application rather than removing them. >> > What I could do is define a wildcard subscription on all topics.
Re: [kafka streams] discuss: dynamically update subscription pattern
HI Guozhang, Thanks for your swift feedback. Using your "Pipe App" example might actually be a neat work-around. I'll see if I can work out a simple prototype for this on our platform. The only downside of this is that I will double the message-load on the platform (from source-topics to processing-topic) But again, I'll try it out and see how far I get with this solution. Thanks again for sharing your insights, Bart On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Bart, > > Thanks for your detailed explanation. I saw your motivation now and it > indeed validates itself as a single application that dynamically change > subscriptions. > > As I mentioned Streams today do not have a native support for dynamically > changing subscriptions. That being said, If you would not care about the > offsets then there may be one sub-optimal workaround: you can have a > two-stage pipeline to separate topic consumption with processing, where the > first stage is a very simple "Pipe" app (e.g. > https://github.com/apache/kafka/blob/trunk/streams/ > quickstart/java/src/main/resources/archetype-resources/ > src/main/java/Pipe.java) > which is very lightweight in resource consumption. Each app pipes from a > topic to the processing topic which aggregates all the topics that you want > to track in the second stage, i.e. the processing stage. > > Your processing app only reads from this aggregated source topic, while > your piping apps pipe each input topic to the aggregate topic. You can read > from the global "command" topic to 1) auto generate the code with the > source topic swapped with the specified string, compile and execute the > code, or 2) shutdown an existing program piping from a topic. This will > admittedly introduce a duplicate topic containing the aggregated data, but > operational-wise may still be simpler. > > > Guozhang > > > On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> wrote: > > > Hi Guozhang, > > > > For the use-cases I have in mind, the offset of the source topics is > > irrelevant to the state stored by the streams application. > > So, when topic 'A' gets dropped and topic 'B' is added, I would prefer > the > > application to start reading from 'latest' but that is actually not > *that* > > important to me. > > The main thing is that I'm able somehow to add a new kafka topic to the > > source of the streams application at runtime, triggered by messages > flowing > > on another "metadata" kafka topic. > > > > So considering your example: > > When 'B' is added, I would expect the streams application to start > reading > > from 'latest' (but not that important) > > When 'A' is removed, the state from 'A' is still valid in the state > stores, > > but 'A' should not be tracked anymore. > > When 'A' is added again, I would expect the streams application to start > > reading from 'latest' (not the committed offset, but again not that > > important for my use-case) > > > > But this being said, my main focus is on the ability to 'add' new kafka > > topics to the application rather than removing them. > > What I could do is define a wildcard subscription on all topics. This > > would update dynamically then, but the problem here is that I will run > > *all* topics through the application which is major overkill and would > make > > it unperformant (especially as there are topics in there that produce a > lot > > of data that should not be tracked, but will pass through the streams > > application then). Let's say from the 300 kafka topics on the system, > > about 50 of them need to be tracked by the application. > > We have the ability to mark these topics through the "metadata" topic, so > > it would be nice that this could also trigger updating the source-pattern > > for the subscriptions in the kafka streams application. > > > > The problem with multiple applications is the following: > > - the "state" should be centralized, as it can be queried (having > multiple > > applications would make it more difficult to achieve this) > > - multiple applications will required more resources to be reserved on > the > > cluster > > - We need an external tool to start/stop the streams applications, > > depending on the info on the metadata topic. > > > > Greets, > > Bart > > > > > > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Bart, > > > > > > Before we talk about dynamic subscription in Streams
[kafka streams] discuss: dynamically update subscription pattern
Hi, I have a question basically on how it would be the best way to implement something within Kafka Streams. The thing I would like to do: "dynamically update the subscription pattern of the source topics. The reasoning behind this (in my project): meta data about the source topics is evented on an other kafka topic, that should be tracked by the kafka streams topology, and depending on that meta data specific source topics should be added, or removed from the kafka streams topology. Currently I track the "meta data topic" as "global state", so that every processor can actually access it to fetch the meta data (this meta data for instance also describes whether or not a specific topic pattern should be tracked by the stream processor) - so consider this as some kind of "configuration" stream about the source topics. So now it comes, Is there any way I could (from a running topology) update the kafka consumer subscriptions? So that I'm able to replace the source topic pattern while the topology is running? I don't think there currently is a way to do this, but as under the hood it is just a kafka consumer, my believe is that it should be possible somehow ... I was thinking about the PartitionAssigner ... if I could get my hands on that one, maybe I could dynamically configure it to only allow specific topic-patterns? Or directly alter the subscription on the underlying consumer? I don't know all the nifty details about the Kafka Streams internals, so it would be nice if someone could direct me in the right direction to achieve this ... Thanks, Bart
Re: [kafka streams] 'null' values in state stores
Hi Guy, Indeed, I referenced the wrong source-code, sorry about that ;-) I created KAFKA-5717 for this. Thanks Bart On Tue, Aug 8, 2017 at 8:08 PM, Damian Guy <damian@gmail.com> wrote: > The change logger is not used during restoration of the in-memory-store. > Restoration is handled > https://github.com/apache/kafka/blob/0.11.0/streams/src/ > main/java/org/apache/kafka/streams/state/internals/ > InMemoryKeyValueStore.java#L79 > > But, even then it is just putting `null` when it should be deleting it. > Feel free to raise a JIRA > Thanks, > Damian > > On Tue, 8 Aug 2017 at 12:09 Bart Vercammen <b...@cloutrix.com> wrote: > > > That's RocksDB .. I'm using in-memory stores ... > > here: > > > > https://github.com/apache/kafka/blob/0.11.0/streams/src/ > main/java/org/apache/kafka/streams/state/internals/ > ChangeLoggingKeyValueBytesStore.java#L56 > > the 'null' is not checked ... > > > > On Tue, Aug 8, 2017 at 12:52 PM, Damian Guy <damian@gmail.com> > wrote: > > > > > Hi, > > > The null values are treated as deletes when they are written to the > > store. > > > You can see here: > > > https://github.com/apache/kafka/blob/0.11.0/streams/src/ > > > main/java/org/apache/kafka/streams/state/internals/ > RocksDBStore.java#L261 > > > > > > On Tue, 8 Aug 2017 at 11:22 Bart Vercammen <b...@cloutrix.com> wrote: > > > > > > > Hi, > > > > > > > > I noticed the following: > > > > When a kafka streams application starts, it will restore its state in > > its > > > > state-stores (from the log-compacted kafka topic). All good so far, > > but > > > I > > > > noticed that the 'deleted' entries are actually read in into the > store > > as > > > > 'key' with value:`null` > > > > > > > > Is this expected behaviour? I would assume that 'null' values are > > > ignored > > > > when restoring the state as this is exactly how the entries are > deleted > > > on > > > > the log-compacted kafka-topic. > > > > > > > > When the compaction has run on the kafka topic, all is fine, but when > > the > > > > segment is not compacted yet, these null values are read in. > > > > > > > > Greets, > > > > Bart > > > > > > > >
Re: Kafka 0.10.1.0 consumer group rebalance broken?
Well, well, mr. Gruchalski, always nice to talk to you ;-) Not sure whether it is indeed related to: https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8 eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633 But I'll have a look and will try to create a test scenario for this that's able to reproduce the issue at hand. I'll also include some logs in my following posts. Thanks for the reply ... food for thought indeed ... On Mon, Nov 28, 2016 at 10:17 PM, Radek Gruchalski <ra...@gruchalski.com> wrote: > There has been plenty of changes in the GroupCoordinator and co between > these two releases: > https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba545052 > 6994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3 > > It might be related to this: > https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8 > eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633 > > If your group is empty, your group is marked dead, when the group is dead, > no matter what you do, it’ll reply with: > https://github.com/apache/kafka/blob/trunk/core/src/ > main/scala/kafka/coordinator/GroupCoordinator.scala#L353 > > Food for thought. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com) > wrote: > > Hi, > > It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ? > When running a small test-project : > - consumers running in own JVM (with different 'client.id') > - producer running in own JVM > - kafka broker : the embedded kafka : KafkaServerStartable > > It looks like the consumers loose their hart-beat after a rebalance got > triggered. > In the logs on the consumer I can actually see that the heartbeat failed > due to "invalid member_id" > > When running the exact same code on a 0.10.0.1 setup, all works perfectly. > Anyone else seen this problem? > > Greets, > Bart > > -- Mvg, Bart Vercammen [image: Picture] clouTrix BVBA +32 486 69 17 68 i...@cloutrix.com
Kafka 0.10.1.0 consumer group rebalance broken?
Hi, It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ? When running a small test-project : - consumers running in own JVM (with different 'client.id') - producer running in own JVM - kafka broker : the embedded kafka : KafkaServerStartable It looks like the consumers loose their hart-beat after a rebalance got triggered. In the logs on the consumer I can actually see that the heartbeat failed due to "invalid member_id" When running the exact same code on a 0.10.0.1 setup, all works perfectly. Anyone else seen this problem? Greets, Bart