[Kafka Consumer] deserializer (documentation) mismatch?

2018-10-15 Thread Bart Vercammen
Hi,

I found a mismatch between the documentation in
the org.apache.kafka.common.serialization.Deserializer and the
implementation in KafkaConsumer.

Deserializer documentation sais: *"serialized bytes; may be null;
implementations are recommended to handle null by returning a value or null
rather than throwing an exception*"
but in the KafkaConsumer, 'null' is never passed to the deserializer.

>From 'parseRecord' in 'org.apache.kafka.clients.consumer.internals.Fetcher'
:
{{
K key = keyBytes == null ? null :
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null :
Utils.toArray(valueBytes);
V value = valueBytes == null ? null :
this.valueDeserializer.deserialize(partition.topic(), headers,
valueByteArray);
}}

I stumbled upon this discrepancy while trying to pass a valid object from
the deserializer to the application when a 'delete' was received on a
log-compacted topic.
So basically the question I have here is the following: is the
documentation in the Deserializer wrong, or is the implementation in the
Fetcher wrong?
To me it seems more plausible to have 'null' being processed by the
deserializer, as to the Fetcher shortcutting on 'null' values ...

Any thoughts?
Greets,
Bart


Re: [KafkaStreams 1.1.1] partition assignment broken?

2018-10-15 Thread Bart Vercammen
Hi Bill,

So we ended up applying the fix for KAFKA-7144 onto kafka 1.1.1 and now all
works fine.
Thanks for the insight.

Greets,
Bart

On Tue, Oct 9, 2018 at 4:49 PM Bill Bejeck  wrote:

> Hi Bart,
>
> Sounds good.  Let me know how it goes.
>
> -Bill
>
> On Tue, Oct 9, 2018 at 5:08 AM Bart Vercammen  wrote:
>
> > Hi Bill,
> >
> > Thanks for the reply.
> > We had a look at the patch for v and will try it out on Kafka
> > 1.1.1
> > Currently a full upstep to 2.0.x is not yet an option.
> >
> > In the mean time I have some unit-tests that reproduce this problem, so
> the
> > backport to v1.1.1 can easily be verified.
> >
> > Greets,
> > Bart
> >
> > On Tue, Oct 9, 2018 at 12:27 AM Bill Bejeck  wrote:
> >
> > > Hi Bart,
> > >
> > > This is a known issue discovered in version 1.1 -
> > > https://issues.apache.org/jira/browse/KAFKA-7144
> > >
> > > This issue has been fixed in Kafka Streams 2.0, any chance you can
> > upgrade
> > > to 2.0?
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Mon, Oct 8, 2018 at 2:46 PM Bart Vercammen 
> wrote:
> > >
> > > > Thanks John,
> > > >
> > > > I'll see what I can do regarding the logs ...
> > > > As a side not, our Kafka cluster is running version v1.1.1 in
> v0.10.2.1
> > > log
> > > > format configuration (due to another issue: KAFKA-6000)
> > > > But, as said, I'll try to come up with some detailed logs, or a
> > scenario
> > > to
> > > > reproduce this.
> > > >
> > > > Greets,
> > > > Bart
> > > >
> > > > On Mon, Oct 8, 2018 at 8:37 PM John Roesler 
> wrote:
> > > >
> > > > > Hi Bart,
> > > > >
> > > > > I suspected it might not be feasible to just dump your production
> > logs
> > > > onto
> > > > > the internet.
> > > > >
> > > > > A repro would be even better, but I bet it wouldn't show up when
> you
> > > try
> > > > > and reproduce it. Good luck!
> > > > >
> > > > > If the repro doesn't turn out, maybe you could just extract the
> > > > assignment
> > > > > lines from your logs?
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen 
> > > wrote:
> > > > >
> > > > > > Hi John,
> > > > > >
> > > > > > Zipping up some logs from our running Kafka cluster is going to
> be
> > a
> > > > bit
> > > > > > difficult.
> > > > > > What I can do is try to reproduce this off-line and capture the
> > logs
> > > > from
> > > > > > there.
> > > > > >
> > > > > > We also had a look in the PartitionAssignor source code (for
> 1.1.1)
> > > and
> > > > > > indeed this behaviour is a bit weird
> > > > > > as from the source code I'd expect equally divided partitions.
> > > > > >
> > > > > > Anyway, hopefully I'll be able to reproduce this issue with some
> > > simple
> > > > > > unit-test like code.
> > > > > > I'll post the results when I have more info.
> > > > > >
> > > > > > Greets,
> > > > > > Bart
> > > > > >
> > > > > > On Mon, Oct 8, 2018 at 7:36 PM John Roesler 
> > > wrote:
> > > > > >
> > > > > > > Hi Bart,
> > > > > > >
> > > > > > > This sounds a bit surprising. Is there any chance you can zip
> up
> > > some
> > > > > > logs
> > > > > > > so we can see the assignment protocol on the nodes?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen <
> b...@cloutrix.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I recently moved some KafkaStreams applications from
> v0.10.2.1
> > to
> > > > > > v1.1.1
> > > > > > > > and now I notice a weird behaviour in the partition
> assignment.
> > > > > > > > When starting 4 instances of my Kafka Streams application (on
> > > > > v1.1.1) I
> > > > > > > see
> > > > > > > > that 17 of the 20 partitions (of a source topic) are assigned
> > to
> > > 1
> > > > > > > instance
> > > > > > > > of the application while the other 3 instances only get 1
> > > partition
> > > > > > > > assigned. (previously (on v0.10.2.1) the all got 5
> partitions.)
> > > > > > > >
> > > > > > > > Is this expected behaviour, as I read that quite some
> > > improvements
> > > > > were
> > > > > > > > done in the partition assignment strategy for Kafka Streams
> > > > > > applications?
> > > > > > > > If yes, how can I make it so that the partitions are equally
> > > > devided
> > > > > > > again
> > > > > > > > across all running applications?   It's a bit weird in my
> > opinion
> > > > as
> > > > > > this
> > > > > > > > makes scaling the application very hard.
> > > > > > > >
> > > > > > > > Also, when initially starting with 1 instance of the
> > application,
> > > > and
> > > > > > > > gradually scaling up, the new instances only get 1 partition
> > > > assigned
> > > > > > ...
> > > > > > > >
> > > > > > > > All my Streams applications use default configuration (more
> or
> > > > less),
> > > > > > > > running 1 stream-thread.
> > > > > > > >
> > > > > > > > Any suggestions / enlightenments on this?
> > > > > > > > Greets,
> > > > > > > > Bart
> > > > > > > >
>


Re: [KafkaStreams 1.1.1] partition assignment broken?

2018-10-09 Thread Bart Vercammen
Hi Bill,

Thanks for the reply.
We had a look at the patch for KAFKA-7144 and will try it out on Kafka 1.1.1
Currently a full upstep to 2.0.x is not yet an option.

In the mean time I have some unit-tests that reproduce this problem, so the
backport to v1.1.1 can easily be verified.

Greets,
Bart

On Tue, Oct 9, 2018 at 12:27 AM Bill Bejeck  wrote:

> Hi Bart,
>
> This is a known issue discovered in version 1.1 -
> https://issues.apache.org/jira/browse/KAFKA-7144
>
> This issue has been fixed in Kafka Streams 2.0, any chance you can upgrade
> to 2.0?
>
> Thanks,
> Bill
>
> On Mon, Oct 8, 2018 at 2:46 PM Bart Vercammen  wrote:
>
> > Thanks John,
> >
> > I'll see what I can do regarding the logs ...
> > As a side not, our Kafka cluster is running version v1.1.1 in v0.10.2.1
> log
> > format configuration (due to another issue: KAFKA-6000)
> > But, as said, I'll try to come up with some detailed logs, or a scenario
> to
> > reproduce this.
> >
> > Greets,
> > Bart
> >
> > On Mon, Oct 8, 2018 at 8:37 PM John Roesler  wrote:
> >
> > > Hi Bart,
> > >
> > > I suspected it might not be feasible to just dump your production logs
> > onto
> > > the internet.
> > >
> > > A repro would be even better, but I bet it wouldn't show up when you
> try
> > > and reproduce it. Good luck!
> > >
> > > If the repro doesn't turn out, maybe you could just extract the
> > assignment
> > > lines from your logs?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen 
> wrote:
> > >
> > > > Hi John,
> > > >
> > > > Zipping up some logs from our running Kafka cluster is going to be a
> > bit
> > > > difficult.
> > > > What I can do is try to reproduce this off-line and capture the logs
> > from
> > > > there.
> > > >
> > > > We also had a look in the PartitionAssignor source code (for 1.1.1)
> and
> > > > indeed this behaviour is a bit weird
> > > > as from the source code I'd expect equally divided partitions.
> > > >
> > > > Anyway, hopefully I'll be able to reproduce this issue with some
> simple
> > > > unit-test like code.
> > > > I'll post the results when I have more info.
> > > >
> > > > Greets,
> > > > Bart
> > > >
> > > > On Mon, Oct 8, 2018 at 7:36 PM John Roesler 
> wrote:
> > > >
> > > > > Hi Bart,
> > > > >
> > > > > This sounds a bit surprising. Is there any chance you can zip up
> some
> > > > logs
> > > > > so we can see the assignment protocol on the nodes?
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen 
> > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I recently moved some KafkaStreams applications from v0.10.2.1 to
> > > > v1.1.1
> > > > > > and now I notice a weird behaviour in the partition assignment.
> > > > > > When starting 4 instances of my Kafka Streams application (on
> > > v1.1.1) I
> > > > > see
> > > > > > that 17 of the 20 partitions (of a source topic) are assigned to
> 1
> > > > > instance
> > > > > > of the application while the other 3 instances only get 1
> partition
> > > > > > assigned. (previously (on v0.10.2.1) the all got 5 partitions.)
> > > > > >
> > > > > > Is this expected behaviour, as I read that quite some
> improvements
> > > were
> > > > > > done in the partition assignment strategy for Kafka Streams
> > > > applications?
> > > > > > If yes, how can I make it so that the partitions are equally
> > devided
> > > > > again
> > > > > > across all running applications?   It's a bit weird in my opinion
> > as
> > > > this
> > > > > > makes scaling the application very hard.
> > > > > >
> > > > > > Also, when initially starting with 1 instance of the application,
> > and
> > > > > > gradually scaling up, the new instances only get 1 partition
> > assigned
> > > > ...
> > > > > >
> > > > > > All my Streams applications use default configuration (more or
> > less),
> > > > > > running 1 stream-thread.
> > > > > >
> > > > > > Any suggestions / enlightenments on this?
> > > > > > Greets,
> > > > > > Bart
> > > > > >
>
>


Re: [KafkaStreams 1.1.1] partition assignment broken?

2018-10-08 Thread Bart Vercammen
Thanks John,

I'll see what I can do regarding the logs ...
As a side not, our Kafka cluster is running version v1.1.1 in v0.10.2.1 log
format configuration (due to another issue: KAFKA-6000)
But, as said, I'll try to come up with some detailed logs, or a scenario to
reproduce this.

Greets,
Bart

On Mon, Oct 8, 2018 at 8:37 PM John Roesler  wrote:

> Hi Bart,
>
> I suspected it might not be feasible to just dump your production logs onto
> the internet.
>
> A repro would be even better, but I bet it wouldn't show up when you try
> and reproduce it. Good luck!
>
> If the repro doesn't turn out, maybe you could just extract the assignment
> lines from your logs?
>
> Thanks,
> -John
>
> On Mon, Oct 8, 2018 at 1:24 PM Bart Vercammen  wrote:
>
> > Hi John,
> >
> > Zipping up some logs from our running Kafka cluster is going to be a bit
> > difficult.
> > What I can do is try to reproduce this off-line and capture the logs from
> > there.
> >
> > We also had a look in the PartitionAssignor source code (for 1.1.1) and
> > indeed this behaviour is a bit weird
> > as from the source code I'd expect equally divided partitions.
> >
> > Anyway, hopefully I'll be able to reproduce this issue with some simple
> > unit-test like code.
> > I'll post the results when I have more info.
> >
> > Greets,
> > Bart
> >
> > On Mon, Oct 8, 2018 at 7:36 PM John Roesler  wrote:
> >
> > > Hi Bart,
> > >
> > > This sounds a bit surprising. Is there any chance you can zip up some
> > logs
> > > so we can see the assignment protocol on the nodes?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen 
> wrote:
> > >
> > > > Hi,
> > > >
> > > > I recently moved some KafkaStreams applications from v0.10.2.1 to
> > v1.1.1
> > > > and now I notice a weird behaviour in the partition assignment.
> > > > When starting 4 instances of my Kafka Streams application (on
> v1.1.1) I
> > > see
> > > > that 17 of the 20 partitions (of a source topic) are assigned to 1
> > > instance
> > > > of the application while the other 3 instances only get 1 partition
> > > > assigned. (previously (on v0.10.2.1) the all got 5 partitions.)
> > > >
> > > > Is this expected behaviour, as I read that quite some improvements
> were
> > > > done in the partition assignment strategy for Kafka Streams
> > applications?
> > > > If yes, how can I make it so that the partitions are equally devided
> > > again
> > > > across all running applications?   It's a bit weird in my opinion as
> > this
> > > > makes scaling the application very hard.
> > > >
> > > > Also, when initially starting with 1 instance of the application, and
> > > > gradually scaling up, the new instances only get 1 partition assigned
> > ...
> > > >
> > > > All my Streams applications use default configuration (more or less),
> > > > running 1 stream-thread.
> > > >
> > > > Any suggestions / enlightenments on this?
> > > > Greets,
> > > > Bart
> > > >
>
>


Re: [KafkaStreams 1.1.1] partition assignment broken?

2018-10-08 Thread Bart Vercammen
Hi John,

Zipping up some logs from our running Kafka cluster is going to be a bit
difficult.
What I can do is try to reproduce this off-line and capture the logs from
there.

We also had a look in the PartitionAssignor source code (for 1.1.1) and
indeed this behaviour is a bit weird
as from the source code I'd expect equally divided partitions.

Anyway, hopefully I'll be able to reproduce this issue with some simple
unit-test like code.
I'll post the results when I have more info.

Greets,
Bart

On Mon, Oct 8, 2018 at 7:36 PM John Roesler  wrote:

> Hi Bart,
>
> This sounds a bit surprising. Is there any chance you can zip up some logs
> so we can see the assignment protocol on the nodes?
>
> Thanks,
> -John
>
> On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen  wrote:
>
> > Hi,
> >
> > I recently moved some KafkaStreams applications from v0.10.2.1 to v1.1.1
> > and now I notice a weird behaviour in the partition assignment.
> > When starting 4 instances of my Kafka Streams application (on v1.1.1) I
> see
> > that 17 of the 20 partitions (of a source topic) are assigned to 1
> instance
> > of the application while the other 3 instances only get 1 partition
> > assigned. (previously (on v0.10.2.1) the all got 5 partitions.)
> >
> > Is this expected behaviour, as I read that quite some improvements were
> > done in the partition assignment strategy for Kafka Streams applications?
> > If yes, how can I make it so that the partitions are equally devided
> again
> > across all running applications?   It's a bit weird in my opinion as this
> > makes scaling the application very hard.
> >
> > Also, when initially starting with 1 instance of the application, and
> > gradually scaling up, the new instances only get 1 partition assigned ...
> >
> > All my Streams applications use default configuration (more or less),
> > running 1 stream-thread.
> >
> > Any suggestions / enlightenments on this?
> > Greets,
> > Bart
> >
>


-- 
Mvg,
Bart Vercammen


clouTrix BVBA
+32 486 69 17 68
i...@cloutrix.com


[KafkaStreams 1.1.1] partition assignment broken?

2018-10-08 Thread Bart Vercammen
Hi,

I recently moved some KafkaStreams applications from v0.10.2.1 to v1.1.1
and now I notice a weird behaviour in the partition assignment.
When starting 4 instances of my Kafka Streams application (on v1.1.1) I see
that 17 of the 20 partitions (of a source topic) are assigned to 1 instance
of the application while the other 3 instances only get 1 partition
assigned. (previously (on v0.10.2.1) the all got 5 partitions.)

Is this expected behaviour, as I read that quite some improvements were
done in the partition assignment strategy for Kafka Streams applications?
If yes, how can I make it so that the partitions are equally devided again
across all running applications?   It's a bit weird in my opinion as this
makes scaling the application very hard.

Also, when initially starting with 1 instance of the application, and
gradually scaling up, the new instances only get 1 partition assigned ...

All my Streams applications use default configuration (more or less),
running 1 stream-thread.

Any suggestions / enlightenments on this?
Greets,
Bart


Re: [kafka streams] discuss: dynamically update subscription pattern

2017-08-16 Thread Bart Vercammen
Hi Guozhang,

In the end I opted for a native Kafka consumer/producer application instead
of using Kafka streams for this.
The overhead in creating new streams applications for each update of the
metadata was a bit to cumbersome.
But still, the issue remains that, although this works (thanks for the
suggestion) I double the data on Kafka.

In order to reduce the data I started moving some of the aggregation logic
from my streams application to this
new application that combines the input streams.  So this leaves me with
only little logic in the streams application
to preserve the state in its kafka backed state-stores.

So it already is happening:
when I now implement state handling in my "combining application" I will
end up creating an own stream-processing framework
instead of levering on kafka streams ... this was not really the intention.

So, in short, your suggestion is a nice work-around, but still leaves me
with my initial remark that it would be useful
to somehow be able to alter the subscriptions in a running streams
application.

Bart


On Tue, Aug 15, 2017 at 1:45 PM, Bart Vercammen <b...@cloutrix.com> wrote:

> HI Guozhang,
>
> Thanks for your swift feedback.
> Using your "Pipe App" example might actually be a neat work-around.
> I'll see if I can work out a simple prototype for this on our platform.
>
> The only downside of this is that I will double the message-load on the
> platform (from source-topics to processing-topic)
> But again, I'll try it out and see how far I get with this solution.
>
> Thanks again for sharing your insights,
> Bart
>
>
> On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>> Hello Bart,
>>
>> Thanks for your detailed explanation. I saw your motivation now and it
>> indeed validates itself as a single application that dynamically change
>> subscriptions.
>>
>> As I mentioned Streams today do not have a native support for dynamically
>> changing subscriptions. That being said, If you would not care about the
>> offsets then there may be one sub-optimal workaround: you can have a
>> two-stage pipeline to separate topic consumption with processing, where
>> the
>> first stage is a very simple "Pipe" app (e.g.
>> https://github.com/apache/kafka/blob/trunk/streams/quickstar
>> t/java/src/main/resources/archetype-resources/src/main/java/Pipe.java)
>> which is very lightweight in resource consumption. Each app pipes from a
>> topic to the processing topic which aggregates all the topics that you
>> want
>> to track in the second stage, i.e. the processing stage.
>>
>> Your processing app only reads from this aggregated source topic, while
>> your piping apps pipe each input topic to the aggregate topic. You can
>> read
>> from the global "command" topic to 1) auto generate the code with the
>> source topic swapped with the specified string, compile and execute the
>> code, or 2) shutdown an existing program piping from a topic. This will
>> admittedly introduce a duplicate topic containing the aggregated data, but
>> operational-wise may still be simpler.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > For the use-cases I have in mind, the offset of the source topics is
>> > irrelevant to the state stored by the streams application.
>> > So, when topic 'A' gets dropped and topic 'B' is added,  I would prefer
>> the
>> > application to start reading from 'latest' but that is actually not
>> *that*
>> > important to me.
>> > The main thing is that I'm able somehow to add a new kafka topic to the
>> > source of the streams application at runtime, triggered by messages
>> flowing
>> > on another "metadata" kafka topic.
>> >
>> > So considering your example:
>> > When 'B' is added, I would expect the streams application to start
>> reading
>> > from 'latest' (but not that important)
>> > When 'A' is removed, the state from 'A' is still valid in the state
>> stores,
>> > but 'A' should not be tracked anymore.
>> > When 'A' is added again, I would expect the streams application to start
>> > reading from 'latest' (not the committed offset, but again not that
>> > important for my use-case)
>> >
>> > But this being said, my main focus is on the ability to 'add' new kafka
>> > topics to the application rather than removing them.
>> > What I could do is define a wildcard subscription on all topics.

Re: [kafka streams] discuss: dynamically update subscription pattern

2017-08-15 Thread Bart Vercammen
HI Guozhang,

Thanks for your swift feedback.
Using your "Pipe App" example might actually be a neat work-around.
I'll see if I can work out a simple prototype for this on our platform.

The only downside of this is that I will double the message-load on the
platform (from source-topics to processing-topic)
But again, I'll try it out and see how far I get with this solution.

Thanks again for sharing your insights,
Bart

On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Bart,
>
> Thanks for your detailed explanation. I saw your motivation now and it
> indeed validates itself as a single application that dynamically change
> subscriptions.
>
> As I mentioned Streams today do not have a native support for dynamically
> changing subscriptions. That being said, If you would not care about the
> offsets then there may be one sub-optimal workaround: you can have a
> two-stage pipeline to separate topic consumption with processing, where the
> first stage is a very simple "Pipe" app (e.g.
> https://github.com/apache/kafka/blob/trunk/streams/
> quickstart/java/src/main/resources/archetype-resources/
> src/main/java/Pipe.java)
> which is very lightweight in resource consumption. Each app pipes from a
> topic to the processing topic which aggregates all the topics that you want
> to track in the second stage, i.e. the processing stage.
>
> Your processing app only reads from this aggregated source topic, while
> your piping apps pipe each input topic to the aggregate topic. You can read
> from the global "command" topic to 1) auto generate the code with the
> source topic swapped with the specified string, compile and execute the
> code, or 2) shutdown an existing program piping from a topic. This will
> admittedly introduce a duplicate topic containing the aggregated data, but
> operational-wise may still be simpler.
>
>
> Guozhang
>
>
> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> wrote:
>
> > Hi Guozhang,
> >
> > For the use-cases I have in mind, the offset of the source topics is
> > irrelevant to the state stored by the streams application.
> > So, when topic 'A' gets dropped and topic 'B' is added,  I would prefer
> the
> > application to start reading from 'latest' but that is actually not
> *that*
> > important to me.
> > The main thing is that I'm able somehow to add a new kafka topic to the
> > source of the streams application at runtime, triggered by messages
> flowing
> > on another "metadata" kafka topic.
> >
> > So considering your example:
> > When 'B' is added, I would expect the streams application to start
> reading
> > from 'latest' (but not that important)
> > When 'A' is removed, the state from 'A' is still valid in the state
> stores,
> > but 'A' should not be tracked anymore.
> > When 'A' is added again, I would expect the streams application to start
> > reading from 'latest' (not the committed offset, but again not that
> > important for my use-case)
> >
> > But this being said, my main focus is on the ability to 'add' new kafka
> > topics to the application rather than removing them.
> > What I could do is define a wildcard subscription on all topics.  This
> > would update dynamically then, but the problem here is that I will run
> > *all* topics through the application which is major overkill and would
> make
> > it unperformant (especially as there are topics in there that produce a
> lot
> > of data that should not be tracked, but will pass through the streams
> > application then).  Let's say from the 300 kafka topics on the system,
> > about 50 of them need to be tracked by the application.
> > We have the ability to mark these topics through the "metadata" topic, so
> > it would be nice that this could also trigger updating the source-pattern
> > for the subscriptions in the kafka streams application.
> >
> > The problem with multiple applications is the following:
> > - the "state" should be centralized, as it can be queried (having
> multiple
> > applications would make it more difficult to achieve this)
> > - multiple applications will required more resources to be reserved on
> the
> > cluster
> > - We need an external tool to start/stop the streams applications,
> > depending on the info on the metadata topic.
> >
> > Greets,
> > Bart
> >
> >
> > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Bart,
> > >
> > > Before we talk about dynamic subscription in Streams 

[kafka streams] discuss: dynamically update subscription pattern

2017-08-11 Thread Bart Vercammen
Hi,

I have a question basically on how it would be the best way to implement
something within Kafka Streams.  The thing I would like to do: "dynamically
update the subscription pattern of the source topics.

The reasoning behind this (in my project):
meta data about the source topics is evented on an other kafka topic, that
should be tracked by the kafka streams topology, and depending on that meta
data specific source topics should be added, or removed from the kafka
streams topology.

Currently I track the "meta data topic" as "global state", so that every
processor can actually access it to fetch the meta data (this meta data for
instance also describes whether or not a specific topic pattern should be
tracked by the stream processor) - so consider this as some kind of
"configuration" stream about the source topics.

So now it comes,
Is there any way I could (from a running topology) update the kafka
consumer subscriptions?
So that I'm able to replace the source topic pattern while the topology is
running?

I don't think there currently is a way to do this, but as under the hood it
is just a kafka consumer, my believe is that it should be possible somehow
...

I was thinking about the PartitionAssigner ... if I could get my hands on
that one, maybe I could dynamically configure it to only allow specific
topic-patterns?
Or directly alter the subscription on the underlying consumer?

I don't know all the nifty details about the Kafka Streams internals, so it
would be nice if someone could direct me in the right direction to achieve
this ...

Thanks,
Bart


Re: [kafka streams] 'null' values in state stores

2017-08-09 Thread Bart Vercammen
Hi Guy,

Indeed, I referenced the wrong source-code, sorry about that ;-)
I created KAFKA-5717 for this.

Thanks

Bart


On Tue, Aug 8, 2017 at 8:08 PM, Damian Guy <damian@gmail.com> wrote:

> The change logger is not used during restoration of the in-memory-store.
> Restoration is handled
> https://github.com/apache/kafka/blob/0.11.0/streams/src/
> main/java/org/apache/kafka/streams/state/internals/
> InMemoryKeyValueStore.java#L79
>
> But, even then it is just putting `null` when it should be deleting it.
> Feel free to raise a JIRA
> Thanks,
> Damian
>
> On Tue, 8 Aug 2017 at 12:09 Bart Vercammen <b...@cloutrix.com> wrote:
>
> > That's RocksDB .. I'm using in-memory stores ...
> > here:
> >
> > https://github.com/apache/kafka/blob/0.11.0/streams/src/
> main/java/org/apache/kafka/streams/state/internals/
> ChangeLoggingKeyValueBytesStore.java#L56
> > the 'null' is not checked ...
> >
> > On Tue, Aug 8, 2017 at 12:52 PM, Damian Guy <damian@gmail.com>
> wrote:
> >
> > > Hi,
> > > The null values are treated as deletes when they are written to the
> > store.
> > > You can see here:
> > > https://github.com/apache/kafka/blob/0.11.0/streams/src/
> > > main/java/org/apache/kafka/streams/state/internals/
> RocksDBStore.java#L261
> > >
> > > On Tue, 8 Aug 2017 at 11:22 Bart Vercammen <b...@cloutrix.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I noticed the following:
> > > > When a kafka streams application starts, it will restore its state in
> > its
> > > > state-stores (from the log-compacted kafka topic).  All good so far,
> > but
> > > I
> > > > noticed that the 'deleted' entries are actually read in into the
> store
> > as
> > > > 'key' with value:`null`
> > > >
> > > > Is this expected behaviour?  I would assume that 'null' values are
> > > ignored
> > > > when restoring the state as this is exactly how the entries are
> deleted
> > > on
> > > > the log-compacted kafka-topic.
> > > >
> > > > When the compaction has run on the kafka topic, all is fine, but when
> > the
> > > > segment is not compacted yet, these null values are read in.
> > > >
> > > > Greets,
> > > > Bart
> > > >
> > >
>


Re: Kafka 0.10.1.0 consumer group rebalance broken?

2016-11-29 Thread Bart Vercammen
Well, well, mr. Gruchalski, always nice to talk to you ;-)

Not sure whether it is indeed related to:
https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8
eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633

But I'll have a look and will try to create a test scenario for this that's
able to reproduce the issue at hand.
I'll also include some logs in my following posts.

Thanks for the reply ... food for thought indeed ...


On Mon, Nov 28, 2016 at 10:17 PM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> There has been plenty of changes in the GroupCoordinator and co between
> these two releases:
> https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba545052
> 6994c92c04#diff-96e4cf31cd54def6b2fb3f7a118c1db3
>
> It might be related to this:
> https://github.com/apache/kafka/commit/4b003d8bcfffded55a00b8ecc9eed8
> eb373fb6c7#diff-d2461f78377b588c4f7e2fe8223d5679R633
>
> If your group is empty, your group is marked dead, when the group is dead,
> no matter what you do, it’ll reply with:
> https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/coordinator/GroupCoordinator.scala#L353
>
> Food for thought.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On November 28, 2016 at 9:04:16 PM, Bart Vercammen (b...@cloutrix.com)
> wrote:
>
> Hi,
>
> It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ?
> When running a small test-project :
> - consumers running in own JVM (with different 'client.id')
> - producer running in own JVM
> - kafka broker : the embedded kafka : KafkaServerStartable
>
> It looks like the consumers loose their hart-beat after a rebalance got
> triggered.
> In the logs on the consumer I can actually see that the heartbeat failed
> due to "invalid member_id"
>
> When running the exact same code on a 0.10.0.1 setup, all works perfectly.
> Anyone else seen this problem?
>
> Greets,
> Bart
>
>


-- 
Mvg,
Bart Vercammen

[image: Picture]
clouTrix BVBA
+32 486 69 17 68
i...@cloutrix.com


Kafka 0.10.1.0 consumer group rebalance broken?

2016-11-28 Thread Bart Vercammen
Hi,

It seems that consumer group rebalance is broken in Kafka 0.10.1.0 ?
When running a small test-project :
- consumers running in own JVM (with different 'client.id')
- producer running in own JVM
- kafka broker : the embedded kafka : KafkaServerStartable

It looks like the consumers loose their hart-beat after a rebalance got
triggered.
In the logs on the consumer I can actually see that the heartbeat failed
due to "invalid member_id"

When running the exact same code on a 0.10.0.1 setup, all works perfectly.
Anyone else seen this problem?

Greets,
Bart