Kafka Streams incorrect aggregation results when re-balancing occurs

2019-08-19 Thread Alex Brekken
Hi all, I have a (relatively) simple streams topology that is producing
some counts, and while testing this code I'm seeing some occasional
incorrect aggregation results.  This seems to happen when a re-balance
occurs - typically due to a timeout or communication hiccup with the Kafka
broker.  The topology is built with the DSL, and utilizes 2 KTables: the
first is really just a de-dup table and the second is the result of the
aggregation.  So at a high level the topology consumes from a source topic,
  groupsByKey() and then does a reduce() where we always return the
newValue.  Then it does a groupBy() on a new key, and finally an
aggregate() call with an adder and subtractor.  Because our source topic
frequently contains duplicate messages, this seemed like a good way to
handle the dupes: the subtractor gets invoked anytime we replace a value in
the "upstream" KTable and removes it from the count, then adds it back
again in the adder.

In the happy-path scenario where we never see any exceptions or rebalances,
this whole thing works great - the counts at the end are 100% correct.  But
when rebalancing is triggered we sometimes get bad counts. My theory is
that when a timeout or connectivity problem happens during that second
aggregation, the data ends up getting saved to the state store but the
offsets don't get committed and the message(s) in the repartition topic
that feed the aggregation get replayed after the stream task gets
rebalanced, causing the counts to get incorrectly incremented or
decremented.  (depending on whether the message was triggering the adder or
the subtractor)  I can simulate this problem (or something similar to this
problem) when debugging the application in my IDE just by pausing execution
on a breakpoint inside the aggregation's adder or subtractor method for a
few seconds.  The result of the adder or subtractor gets saved to the state
store which means that when the messages in the repartition topic get
re-processed, the counts get doubled.  If I enable "exactly_once"
processing, I'm unable to recreate the problem and the counts are always
accurate.

My questions are:

1.  Is this expected behavior? In a hostile application environment where
connectivity problems and rebalances happen frequently, is some degree of
incorrectly aggregated data just a reality of life?

2.  Is exactly_once processing the right solution if correctness is of
highest importance?  Or should I be looking at different ways of writing
the topology?

Thanks for any advice!

Alex


Re: list of pattern processed topic list

2019-08-19 Thread Kamal Chandraprakash
You can use the KafkaConsumer#assignment() method to get all the assigned
topic-partitions for that consumer instance.
But, you've to periodically call poll method to get the latest assignment
which may return records.
This shortcoming is actively discussed in the below threads.

https://mail-archives.apache.org/mod_mbox/kafka-dev/201908.mbox/<08030a68-3f0b-42db-9b79-dfcd3200cf25%40www.fastmail.com>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484


On Mon, Aug 19, 2019 at 6:33 PM M. Manna  wrote:

> Kafka doesn’t have regex subscription. You could get a list of available
> topic using admin client. From there, you can decide what topics you would
> like to subscribe to (or process using your logic).
>
> Check documentation for AdminClient.listTopics and ListTopicResult holder.
>
> I hope this helps.
>
> Regards,
>
> On Mon, 19 Aug 2019 at 13:22, Upendra Yadav  wrote:
>
> > Hi,
> > I have initialised kafka consumer:
> > KafkaConsumer consumer = new KafkaConsumer > byte[]>(consumerConfig);
> >
> > and subscribed with topic pattern:
> > consumer.subscribe(Pattern.compile(topicRegex), listener);
> >
> > now, I'm trying to get the list of topics.
> > Map> topicsPartitions =
> consumer.listTopics();
> >
> > here topic list is not following given pattern. it is giving all topic
> > list.
> > Is this expected behaviour? Is there any API to get subscribed topic list
> > with consumer instance?
> > or Do I need to write simple prog to process this list with same pattern?
> >
>


Re: list of pattern processed topic list

2019-08-19 Thread M. Manna
Kafka doesn’t have regex subscription. You could get a list of available
topic using admin client. From there, you can decide what topics you would
like to subscribe to (or process using your logic).

Check documentation for AdminClient.listTopics and ListTopicResult holder.

I hope this helps.

Regards,

On Mon, 19 Aug 2019 at 13:22, Upendra Yadav  wrote:

> Hi,
> I have initialised kafka consumer:
> KafkaConsumer consumer = new KafkaConsumer byte[]>(consumerConfig);
>
> and subscribed with topic pattern:
> consumer.subscribe(Pattern.compile(topicRegex), listener);
>
> now, I'm trying to get the list of topics.
> Map> topicsPartitions = consumer.listTopics();
>
> here topic list is not following given pattern. it is giving all topic
> list.
> Is this expected behaviour? Is there any API to get subscribed topic list
> with consumer instance?
> or Do I need to write simple prog to process this list with same pattern?
>


list of pattern processed topic list

2019-08-19 Thread Upendra Yadav
Hi,
I have initialised kafka consumer:
KafkaConsumer consumer = new KafkaConsumer(consumerConfig);

and subscribed with topic pattern:
consumer.subscribe(Pattern.compile(topicRegex), listener);

now, I'm trying to get the list of topics.
Map> topicsPartitions = consumer.listTopics();

here topic list is not following given pattern. it is giving all topic list.
Is this expected behaviour? Is there any API to get subscribed topic list
with consumer instance?
or Do I need to write simple prog to process this list with same pattern?