Can two different kafka clusters be used in kafka streams for consumption from 1 cluster and produce on another cluster

2024-04-11 Thread Pushkar Deole
Hi All,

We are using a streams application and currently the application uses a
common kafka cluster that is shared along with many other applications.
Our application consumes from topics that are populated by other
applications and it consumes the events from those topics, processes those
events and produces new events onto other topics that are specific to our
application. This is done through a streams topology.
Now, we want to separate our application and topics related to only our
application on a separate kafka cluster. For this separation, our
application will still consume from common cluster but produce the
processed the data onto new cluster. Is it possible through kafka streams,
wherein the consumer in the streams is consuming from 1 cluster while the
sink happens onto another cluster?


Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-09-05 Thread Pushkar Deole
I think I could figure out a way. There are certain commands that can be
executed from kafka-cli to disassociate a consumer group from the topic
that are not more being consumed.
With this sort of command, I could delete the consumer offsets for a
consumer group for a specific topic and that resolved the lag problem:

kafka-consumer-groups --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS
--command-config ~/kafka.properties --delete-offsets --group
"" --topic " wrote:

> As long as the consumer group is active, nothing will be deleted. That
> is the reason why you get those incorrect alerts -- Kafka cannot know
> that you stopped consuming from those topics. (That is what I tried to
> explain -- seems I did a bad job...)
>
> Changing the group.id is tricky because Kafka Streams uses it to
> identify internal topic names (for repartiton and chagnelog topics), and
> thus your app would start with newly created (and thus empty topics). --
> You might want to restart the app with `auto.offset.reset = "earliest"`
> and reprocess all available input to re-create state.
>
>
> -Matthias
>
> On 8/19/23 8:07 AM, Pushkar Deole wrote:
> > @matthias
> >
> > what are the alternatives to get rid of this issue? When the lag starts
> > increasing, we have alerts configured on our monitoring system in Datadog
> > which starts sending alerts and alarms to reliability teams. I know in
> > kafka the inactive consumer group is cleared up after 7 days however not
> > sure if that is the case with topics that were consumed previously and
> not
> > consumed now.
> >
> > Does creation of new consumer group (setting a different application.id)
> on
> > streams application an option here?
> >
> >
> > On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax 
> wrote:
> >
> >> Well, it's kinda expected behavior. It's a split brain problem.
> >>
> >> In the end, you use the same `application.id / group.id` and thus the
> >> committed offsets for the removed topics are still in
> >> `__consumer_offsets` topics and associated with the consumer group.
> >>
> >> If a tool inspects lags and compares the latest committed offsets to
> >> end-offsets it looks for everything it finds in the `__consumer_offsets`
> >> topics for the group in question -- the tool cannot know that you
> >> changed the application and that is does not read from those topics any
> >> longer (and thus does not commit any longer).
> >>
> >> I am not sure from top of my head if you could do a manual cleanup for
> >> the `application.id` and topics in question and delete the committed
> >> offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
> >> client and/or the command line tools...
> >>
> >> In know that it's possible to delete committed offsets for a consumer
> >> group (if a group becomes inactive, the broker would also cleanup all
> >> group metadata after a configurable timeout), but I am not sure if
> >> that's for the entire consumer group (ie, all topic) or if you can do it
> >> on a per-topic basis, too.
> >>
> >>
> >> HTH,
> >> -Matthias
> >>
> >>
> >> On 8/16/23 2:11 AM, Pushkar Deole wrote:
> >>> Hi streams Dev community  @matthias, @bruno
> >>>
> >>> Any inputs on above issue? Is this a bug in the streams library wherein
> >> the
> >>> input topic removed from streams processor topology, the underlying
> >>> consumer group still reporting lag against those?
> >>>
> >>> On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 
> >> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I have a streams application with 3 instances with application-id set
> to
> >>>> applicationV1. The application uses processor API with reading from
> >> source
> >>>> topics, processing the data and writing to destination topic.
> >>>> Currently it consumes from 6 source topics however we don't need to
> >>>> process data any more from 2 of those topics so we removed 2 topics
> from
> >>>> the source topics list. We have configured Datadog dashboard to report
> >> and
> >>>> alert on consumer lag so after removing the 2 source topics and
> >> deploying
> >>>> application, we started getting several alerts about consumer lag on
> >>>> applicationV1 consumer group which is underlying consumer group of the
> >>>> streams application. When we looked at the consumer group from
> >> kafka-cli,
> >>>> we could see that the consumer group is reporting lag against the
> topics
> >>>> removed from source topic list which is reflecting as increasing lag
> on
> >>>> Datadog monitoring.
> >>>>
> >>>> Can someone advise if this is expected behavior? In my opinion, this
> is
> >>>> not expected since streams application no more has those topics as
> part
> >> of
> >>>> source, it should not report lag on those.
> >>>>
> >>>
> >>
> >
>


Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-08-19 Thread Pushkar Deole
@matthias

what are the alternatives to get rid of this issue? When the lag starts
increasing, we have alerts configured on our monitoring system in Datadog
which starts sending alerts and alarms to reliability teams. I know in
kafka the inactive consumer group is cleared up after 7 days however not
sure if that is the case with topics that were consumed previously and not
consumed now.

Does creation of new consumer group (setting a different application.id) on
streams application an option here?


On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax  wrote:

> Well, it's kinda expected behavior. It's a split brain problem.
>
> In the end, you use the same `application.id / group.id` and thus the
> committed offsets for the removed topics are still in
> `__consumer_offsets` topics and associated with the consumer group.
>
> If a tool inspects lags and compares the latest committed offsets to
> end-offsets it looks for everything it finds in the `__consumer_offsets`
> topics for the group in question -- the tool cannot know that you
> changed the application and that is does not read from those topics any
> longer (and thus does not commit any longer).
>
> I am not sure from top of my head if you could do a manual cleanup for
> the `application.id` and topics in question and delete the committed
> offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
> client and/or the command line tools...
>
> In know that it's possible to delete committed offsets for a consumer
> group (if a group becomes inactive, the broker would also cleanup all
> group metadata after a configurable timeout), but I am not sure if
> that's for the entire consumer group (ie, all topic) or if you can do it
> on a per-topic basis, too.
>
>
> HTH,
>-Matthias
>
>
> On 8/16/23 2:11 AM, Pushkar Deole wrote:
> > Hi streams Dev community  @matthias, @bruno
> >
> > Any inputs on above issue? Is this a bug in the streams library wherein
> the
> > input topic removed from streams processor topology, the underlying
> > consumer group still reporting lag against those?
> >
> > On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 
> wrote:
> >
> >> Hi All,
> >>
> >> I have a streams application with 3 instances with application-id set to
> >> applicationV1. The application uses processor API with reading from
> source
> >> topics, processing the data and writing to destination topic.
> >> Currently it consumes from 6 source topics however we don't need to
> >> process data any more from 2 of those topics so we removed 2 topics from
> >> the source topics list. We have configured Datadog dashboard to report
> and
> >> alert on consumer lag so after removing the 2 source topics and
> deploying
> >> application, we started getting several alerts about consumer lag on
> >> applicationV1 consumer group which is underlying consumer group of the
> >> streams application. When we looked at the consumer group from
> kafka-cli,
> >> we could see that the consumer group is reporting lag against the topics
> >> removed from source topic list which is reflecting as increasing lag on
> >> Datadog monitoring.
> >>
> >> Can someone advise if this is expected behavior? In my opinion, this is
> >> not expected since streams application no more has those topics as part
> of
> >> source, it should not report lag on those.
> >>
> >
>


Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-08-16 Thread Pushkar Deole
Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein the
input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole  wrote:

> Hi All,
>
> I have a streams application with 3 instances with application-id set to
> applicationV1. The application uses processor API with reading from source
> topics, processing the data and writing to destination topic.
> Currently it consumes from 6 source topics however we don't need to
> process data any more from 2 of those topics so we removed 2 topics from
> the source topics list. We have configured Datadog dashboard to report and
> alert on consumer lag so after removing the 2 source topics and deploying
> application, we started getting several alerts about consumer lag on
> applicationV1 consumer group which is underlying consumer group of the
> streams application. When we looked at the consumer group from kafka-cli,
> we could see that the consumer group is reporting lag against the topics
> removed from source topic list which is reflecting as increasing lag on
> Datadog monitoring.
>
> Can someone advise if this is expected behavior? In my opinion, this is
> not expected since streams application no more has those topics as part of
> source, it should not report lag on those.
>


kafka streams consumer group reporting lag even on source topics removed from topology

2023-08-09 Thread Pushkar Deole
Hi All,

I have a streams application with 3 instances with application-id set to
applicationV1. The application uses processor API with reading from source
topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to process
data any more from 2 of those topics so we removed 2 topics from the source
topics list. We have configured Datadog dashboard to report and alert on
consumer lag so after removing the 2 source topics and deploying
application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from kafka-cli,
we could see that the consumer group is reporting lag against the topics
removed from source topic list which is reflecting as increasing lag on
Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this is not
expected since streams application no more has those topics as part of
source, it should not report lag on those.


Re: kafka streams re-partitioning on incoming events

2023-07-25 Thread Pushkar Deole
Thanks a lot Bruno!

I am just trying the Processor API as you mentioned above, so the processor
will write record to another kafka topic with new key. I am just having
difficulty to read in another processor from that kafka topic and wondering
if I need to create another stream with source as intermediate kafka topic?
Else how can I read from intermediate topic in another processor?

On Fri, Jul 14, 2023 at 9:25 PM Bruno Cadonna  wrote:

> Hi Pushkar,
>
> The events after repartitioning are processed by a different task than
> the task that read the events from the source topic. The task assignor
> assigns those tasks to stream threads. So events with the same key will
> be processed by the same task. As far as I understood from your earlier
> message, you want that events with same call ID after the repartitioning
> are processed by the same applications instance. If same keys are
> processed by the same task then that implies that the events  are
> processed on the same application instance.
>
> With the Processor API you can also repartition data by specifying a new
> key for each record and writing the records to a Kafka topic. With an
> other processor you can then read the repartitioned events from that
> topic. However, you have to manage the intermediate topic yourself.
>
> In the DSL there is the process() method that allows to use custom code
> for processing events similar to the Processor API.
>
> Best,
> Bruno
>
> On 7/14/23 5:09 PM, Pushkar Deole wrote:
> > Thanks Bruno..
> >
> > What do you mean exactly with "...and then process them in that order"?
> >
> > By this, I mean to say if the order of events in partition will be
> > processed after repartition. Probably I don't need to go through internal
> > details but does the partitions of topic are again assigned to stream
> > threads tasks and hence could be processed by another stream thread than
> > the one that read the event from source partition?
> >
> > Also, the repartition method is available as part of Streams DSL API. We
> > seem to be using Streams processor API so is there a similar way to
> achieve
> > repartitioning in Processor API?
> >
> > Or if we want to go other way round, if we want to move from Processor
> API
> > to Streams DSL then how can we migrate, i.e. where do we store the logic
> > that we normally store in Processor in Topology?
> >
> > On Fri, Jul 14, 2023 at 3:50 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Pushkar,
> >>
> >> you can use repartition() for repartition your data. Method through() is
> >> actually deprecated in favor of repartition(). Before you repartition
> >> you need to specify the new key with selectKey().
> >>
> >> What do you mean exactly with "...and then process them in that order"?
> >>
> >> The order of the events from the same source partition (partition before
> >> repartitioning) that have the same call ID (or more generally that end
> >> up in the same partition after repartitioning) will be preserved but
> >> Kafka does not guarantee the order of events from different source
> >> partitions.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 7/9/23 2:45 PM, Pushkar Deole wrote:
> >>> Hi,
> >>>
> >>> We have a kafka streams application that consumes from multiple topic
> >> with
> >>> different keys. Before processing these events in the application, we
> >> want
> >>> to repartition those events on a single key that will ensure related
> >> events
> >>> are processed by same application instance. e.g. the events on multiple
> >>> topics are related to same call however they are keyed on multiple keys
> >> on
> >>> those topics and hence go to different application instances but after
> >>> consuming those events by the streams, we want to partition them again
> on
> >>> the call id and then process them in that order. Is this possible?
> >>> I got to know about through() method on the KStream which seems to be
> >> doing
> >>> similar thing however not sure if it would achieve the below
> >> functionality:
> >>>
> >>> Call with id C123 is initiated and following events arrive on 3 topics
> >> with
> >>> respective keys:
> >>>
> >>> Event 1 on TopicA: with key a1
> >>> Event 2 on TopicB: with key b1
> >>> Event 3 on TopicC: with key c1
> >>>
> >>> Let's assume these are consumed by 3 different instances of kafka
> streams
> >>> application however those application process them with through()
> method
> >>> with a consolidatedTopic with the key C123.
> >>> Will this ensure that these 3 events go to the same partition on
> >>> consolidatedTopic, and hence after repartitioning, those will be
> consumed
> >>> by same instance of application?
> >>>
> >>
> >
>


Re: kafka streams re-partitioning on incoming events

2023-07-14 Thread Pushkar Deole
Thanks Bruno..

What do you mean exactly with "...and then process them in that order"?

By this, I mean to say if the order of events in partition will be
processed after repartition. Probably I don't need to go through internal
details but does the partitions of topic are again assigned to stream
threads tasks and hence could be processed by another stream thread than
the one that read the event from source partition?

Also, the repartition method is available as part of Streams DSL API. We
seem to be using Streams processor API so is there a similar way to achieve
repartitioning in Processor API?

Or if we want to go other way round, if we want to move from Processor API
to Streams DSL then how can we migrate, i.e. where do we store the logic
that we normally store in Processor in Topology?

On Fri, Jul 14, 2023 at 3:50 PM Bruno Cadonna  wrote:

> Hi Pushkar,
>
> you can use repartition() for repartition your data. Method through() is
> actually deprecated in favor of repartition(). Before you repartition
> you need to specify the new key with selectKey().
>
> What do you mean exactly with "...and then process them in that order"?
>
> The order of the events from the same source partition (partition before
> repartitioning) that have the same call ID (or more generally that end
> up in the same partition after repartitioning) will be preserved but
> Kafka does not guarantee the order of events from different source
> partitions.
>
> Best,
> Bruno
>
> On 7/9/23 2:45 PM, Pushkar Deole wrote:
> > Hi,
> >
> > We have a kafka streams application that consumes from multiple topic
> with
> > different keys. Before processing these events in the application, we
> want
> > to repartition those events on a single key that will ensure related
> events
> > are processed by same application instance. e.g. the events on multiple
> > topics are related to same call however they are keyed on multiple keys
> on
> > those topics and hence go to different application instances but after
> > consuming those events by the streams, we want to partition them again on
> > the call id and then process them in that order. Is this possible?
> > I got to know about through() method on the KStream which seems to be
> doing
> > similar thing however not sure if it would achieve the below
> functionality:
> >
> > Call with id C123 is initiated and following events arrive on 3 topics
> with
> > respective keys:
> >
> > Event 1 on TopicA: with key a1
> > Event 2 on TopicB: with key b1
> > Event 3 on TopicC: with key c1
> >
> > Let's assume these are consumed by 3 different instances of kafka streams
> > application however those application process them with through() method
> > with a consolidatedTopic with the key C123.
> > Will this ensure that these 3 events go to the same partition on
> > consolidatedTopic, and hence after repartitioning, those will be consumed
> > by same instance of application?
> >
>


Re: kafka streams re-partitioning on incoming events

2023-07-14 Thread Pushkar Deole
Hello, *Kafka dev community, @matthiasJsax*

Can you comment on below question? It is very important for us since we are
getting inconsistencies due to current design

On Sun, Jul 9, 2023 at 6:15 PM Pushkar Deole  wrote:

> Hi,
>
> We have a kafka streams application that consumes from multiple topic with
> different keys. Before processing these events in the application, we want
> to repartition those events on a single key that will ensure related events
> are processed by same application instance. e.g. the events on multiple
> topics are related to same call however they are keyed on multiple keys on
> those topics and hence go to different application instances but after
> consuming those events by the streams, we want to partition them again on
> the call id and then process them in that order. Is this possible?
> I got to know about through() method on the KStream which seems to be
> doing similar thing however not sure if it would achieve the below
> functionality:
>
> Call with id C123 is initiated and following events arrive on 3 topics
> with respective keys:
>
> Event 1 on TopicA: with key a1
> Event 2 on TopicB: with key b1
> Event 3 on TopicC: with key c1
>
> Let's assume these are consumed by 3 different instances of kafka streams
> application however those application process them with through() method
> with a consolidatedTopic with the key C123.
> Will this ensure that these 3 events go to the same partition on
> consolidatedTopic, and hence after repartitioning, those will be consumed
> by same instance of application?
>


kafka streams re-partitioning on incoming events

2023-07-09 Thread Pushkar Deole
Hi,

We have a kafka streams application that consumes from multiple topic with
different keys. Before processing these events in the application, we want
to repartition those events on a single key that will ensure related events
are processed by same application instance. e.g. the events on multiple
topics are related to same call however they are keyed on multiple keys on
those topics and hence go to different application instances but after
consuming those events by the streams, we want to partition them again on
the call id and then process them in that order. Is this possible?
I got to know about through() method on the KStream which seems to be doing
similar thing however not sure if it would achieve the below functionality:

Call with id C123 is initiated and following events arrive on 3 topics with
respective keys:

Event 1 on TopicA: with key a1
Event 2 on TopicB: with key b1
Event 3 on TopicC: with key c1

Let's assume these are consumed by 3 different instances of kafka streams
application however those application process them with through() method
with a consolidatedTopic with the key C123.
Will this ensure that these 3 events go to the same partition on
consolidatedTopic, and hence after repartitioning, those will be consumed
by same instance of application?


Re: kafka streams partition assignor strategy for version 2.5.1 - does it use sticky assignment

2023-04-16 Thread Pushkar Deole
Thanks John... however I have few more questions:

How does this configuration work along with static group membership
protocol? Or does this work only with dynamic group membership and not work
well when static membership is configured?

Secondly, I gather that streams doesn't immediately trigger rebalance when
stream is closed on the instance that is being shut down, until the
session.timeout expires. So how does this no downtime configuration you
mentioned work since there will be downtime until session.timeout expires?

On Sat, Apr 15, 2023 at 8:13 PM John Roesler  wrote:

> Hi Pushkar,
>
> In 2.5, Kafka Streams used an assignor that tried to strike a compromise
> between stickiness and workload balance, so you would observe some
> stickiness, but not all the time.
>
> In 2.6, we introduced the "high availability task assignor" (see KIP-441
> https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams).
> This assignor is guaranteed to always assign tasks to the instance that is
> most caught up (typically, this would be the instance that was already the
> active processor, which is equivalent to stickiness). In the case of losing
> an instance (eg the pod gets replaced), any standby replica would be
> considered "most caught up" and would take over processing with very little
> downtime.
>
> The new assignor achieves balance over time by "warming up" tasks in the
> background on other instances and then swaps the assignment over to them
> when they are caught up.
>
> So, if you upgrade Streams, you should be able to configure at least one
> standby task and then be able to implement the "rolling replacement"
> strategy you described. If you are willing to wait until Streams gradually
> balances the assignment over time after each replacement, then you can
> cycle out the whole cluster without ever having downtime or developing
> workload skew. Note that there are several configuration parameters you can
> adjust to speed up the warm-up process:
> https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Parameters
> .
>
> I hope this helps!
> -John
>
> On 2023/04/14 17:41:19 Pushkar Deole wrote:
> > Any inputs on below query?
> >
> > On Wed, Apr 12, 2023 at 2:22 PM Pushkar Deole 
> wrote:
> >
> > > Hi All,
> > >
> > > We are using version 2.5.1 of kafka-streams with 3 application
> instances
> > > deployed as 3 kubernetes pods.
> > > It consumes from multiple topics, each with 6 partitions.
> > > I would like to know if streams uses sticky partition assignor strategy
> > > internally since we can't set it externally on streams.
> > >
> > > My scenario is like this: during rolling upgrades
> > > Step 1: 1 new pod comes up so there are 4 pods, with some partitions
> > > assigned to newly created pod and k8s then deletes one of older pods,
> so it
> > > is pod1, pod2, pod3 (older) and pod4 (newer). Then pod1 is deleted. So
> > > ultimately pod2, pod3, pod4
> > >
> > > Step 2: K8s then repeats same for another old pod i.e. create a new pod
> > > and then delete old pod. So pod2, pod3, pod4, pod5 and then delete
> pod2. So
> > > ultimately pod3, pod4 and pod5
> > >
> > > The question I have here is: will kafka streams try to sticky with the
> > > partitions assigned to newly created pods during all these rebalances
> i.e.
> > > the partitions assigned to pod4 in step 1 will still be retained during
> > > step 2 when another older pod gets deleted OR the partitions are
> reshuffled
> > > on each rebalance whenever older pods get deleted. So during step 2,
> when
> > > pod2 is deleted, the partitions assigned to pod4 in step 1 will also
> > > reshuffle again or it will be there and any new partitions will only be
> > > assigned?
> > >
> > >
> >
>


Re: kafka streams partition assignor strategy for version 2.5.1 - does it use sticky assignment

2023-04-14 Thread Pushkar Deole
Any inputs on below query?

On Wed, Apr 12, 2023 at 2:22 PM Pushkar Deole  wrote:

> Hi All,
>
> We are using version 2.5.1 of kafka-streams with 3 application instances
> deployed as 3 kubernetes pods.
> It consumes from multiple topics, each with 6 partitions.
> I would like to know if streams uses sticky partition assignor strategy
> internally since we can't set it externally on streams.
>
> My scenario is like this: during rolling upgrades
> Step 1: 1 new pod comes up so there are 4 pods, with some partitions
> assigned to newly created pod and k8s then deletes one of older pods, so it
> is pod1, pod2, pod3 (older) and pod4 (newer). Then pod1 is deleted. So
> ultimately pod2, pod3, pod4
>
> Step 2: K8s then repeats same for another old pod i.e. create a new pod
> and then delete old pod. So pod2, pod3, pod4, pod5 and then delete pod2. So
> ultimately pod3, pod4 and pod5
>
> The question I have here is: will kafka streams try to sticky with the
> partitions assigned to newly created pods during all these rebalances i.e.
> the partitions assigned to pod4 in step 1 will still be retained during
> step 2 when another older pod gets deleted OR the partitions are reshuffled
> on each rebalance whenever older pods get deleted. So during step 2, when
> pod2 is deleted, the partitions assigned to pod4 in step 1 will also
> reshuffle again or it will be there and any new partitions will only be
> assigned?
>
>


kafka streams partition assignor strategy for version 2.5.1 - does it use sticky assignment

2023-04-12 Thread Pushkar Deole
Hi All,

We are using version 2.5.1 of kafka-streams with 3 application instances
deployed as 3 kubernetes pods.
It consumes from multiple topics, each with 6 partitions.
I would like to know if streams uses sticky partition assignor strategy
internally since we can't set it externally on streams.

My scenario is like this: during rolling upgrades
Step 1: 1 new pod comes up so there are 4 pods, with some partitions
assigned to newly created pod and k8s then deletes one of older pods, so it
is pod1, pod2, pod3 (older) and pod4 (newer). Then pod1 is deleted. So
ultimately pod2, pod3, pod4

Step 2: K8s then repeats same for another old pod i.e. create a new pod and
then delete old pod. So pod2, pod3, pod4, pod5 and then delete pod2. So
ultimately pod3, pod4 and pod5

The question I have here is: will kafka streams try to sticky with the
partitions assigned to newly created pods during all these rebalances i.e.
the partitions assigned to pod4 in step 1 will still be retained during
step 2 when another older pod gets deleted OR the partitions are reshuffled
on each rebalance whenever older pods get deleted. So during step 2, when
pod2 is deleted, the partitions assigned to pod4 in step 1 will also
reshuffle again or it will be there and any new partitions will only be
assigned?


is exactly-once supported with kafka streams application with external state store like redis

2023-04-03 Thread Pushkar Deole
Hi All,

We are using streams application with redis for state store.
Redis was mainly considered instead of kafka state stores because of the
reason that global state store once updated by one application instance was
taking few milliseconds to reflect updated global state to another
application instance.
Now, we may need to enable exactly-once semantics, however wondering if it
would work with redis state store, or rollbacks would still have stale
state left in redis?


can Kafka streams support ordering across 2 different topics when consuming from multiple source topics?

2023-03-21 Thread Pushkar Deole
Hi All,

We have a kafka streams application that consumes from 2 different topics
say topic A and topic B. The application uses data of telephone call on
those topics and each call has a call id which is used as key to send
events to those 2 topics. e.g. for a telephone call, the 1st event related
to that call is sent to A with call id however subsequent event for that
same call might go to topic B again with call id as key.

*At times, we need to process those 2 events in an order, which is not
possible with the current topology that we are using*. *Can someone suggest
if this is possible to achieve with streams?*
The topology is as below:

Topic A has 6 partitions
Topics B has 6 partitions
Call id used as key on both topics
Kafka streams application has 3 instances that consumes from both of the
topics as source topics.
Each streams application instance has 2 stream threads thus total 6 stream
threads across 3 instances of streams application cater to 6 partitions of
inputs topics.


Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-12-08 Thread Pushkar Deole
Thanks John R.

I am adding John Brackin to this thread, who can provide further details of
topology description

On Tue, Dec 6, 2022 at 8:28 AM John Roesler  wrote:

> Hi Pushkar,
>
> I'm sorry for the delay. I'm afraid I'm having trouble picturing the
> situation. Can you provide the topology description? That will show us
> whether we should expect the stores to always be in the same instances or
> not. If you can also include a simplified version of your program, we might
> be able to provide some suggestions.
>
> Thanks,
> -John
>
> On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote:
> > John or Matthias
> >
> > can you help here, we are frequently getting errors like below:
> >
> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> > store, records, may have migrated to another instance.
> >
> > For the same key, the record exist in totals state store but not in
> > 'records' state store.
> >
> > John,
> >
> > can you provide more details on the groupBy option?
> >
> > On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole 
> wrote:
> >
> >> Hi John,
> >>
> >> I am not sure I understood it correctly, even with branching that uses a
> >> different state store, the key of incoming event is still the same, so
> we
> >> expect it to land in the local state store on the same pod.
> >> e.g. an event with OPEN status, with key xyz came in and processed
> through
> >> one branch and it is stored in state store 'totals', state maintained on
> >> local state store on same pod
> >> 2nd event with OPEN status, with key xyz came in and again processed and
> >> stored in 'totals'. State maintained on local state store on same pod
> >>
> >> 3rd event with CLOSED status, with key xyz came in and processed. The
> >> state is stored in 'record' state store, it is expected to be stored in
> >> state store on same pod.
> >> Why it would go to some other pod?
> >>
> >> On Wed, Nov 23, 2022 at 8:50 PM John Roesler 
> wrote:
> >>
> >>> Hi Pushkar,
> >>>
> >>> Thanks for the question. I think that what’s happening is that, even
> >>> though both branches use the same grouping logic, Streams can’t detect
> that
> >>> they are the same. It just sees two group-bys and therefore introduces
> two
> >>> repartitions, with a separate downstream task for each.
> >>>
> >>> You might want to print out the topology description and visualize it
> >>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
> >>> whether the stores wind up in the same task or not.
> >>>
> >>> The visualization will also show you the names of the input topics for
> >>> those two partitions, which you can use in conjunction with the
> metadata
> >>> methods on your KafkaStreams instance to query for the location of the
> keys
> >>> in both stores.
> >>>
> >>> I suspect that with some tweaks you can re-write the topology to just
> >>> have one downstream task, if that’s what you prefer.
> >>>
> >>> By the way, I think you could propose to add an optimization to make
> the
> >>> groupBy behave the way you expected. If that’s interesting to you, let
> us
> >>> know and we can give you some pointers!
> >>>
> >>> I hope this helps,
> >>> John
> >>>
> >>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> >>> > Hi All,
> >>> >
> >>> > I have a stream application that creates 2 branches.  Each branch
> >>> includes
> >>> > a state store where the status field of the kafka message determines
> the
> >>> > branch, and therefore the state store used:
> >>> >
> >>> > Status OPEN = State store name totals
> >>> >
> >>> > Status CLOSED = State store name records
> >>> >
> >>> >
> >>> >
> >>> > I’m seeing that the streams application is running on a pod; however
> I’m
> >>> > getting the exception:
> >>> >
> >>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> >>> > store, records, may have migrated to another instance.
> >>> >
> >>> >
> >>> >
> >>> > If I physically access the pod and check the Rocksdb folders I do not
> >>> see
&g

Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-12-05 Thread Pushkar Deole
John or Matthias

can you help here, we are frequently getting errors like below:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, records, may have migrated to another instance.

For the same key, the record exist in totals state store but not in
'records' state store.

John,

can you provide more details on the groupBy option?

On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole  wrote:

> Hi John,
>
> I am not sure I understood it correctly, even with branching that uses a
> different state store, the key of incoming event is still the same, so we
> expect it to land in the local state store on the same pod.
> e.g. an event with OPEN status, with key xyz came in and processed through
> one branch and it is stored in state store 'totals', state maintained on
> local state store on same pod
> 2nd event with OPEN status, with key xyz came in and again processed and
> stored in 'totals'. State maintained on local state store on same pod
>
> 3rd event with CLOSED status, with key xyz came in and processed. The
> state is stored in 'record' state store, it is expected to be stored in
> state store on same pod.
> Why it would go to some other pod?
>
> On Wed, Nov 23, 2022 at 8:50 PM John Roesler  wrote:
>
>> Hi Pushkar,
>>
>> Thanks for the question. I think that what’s happening is that, even
>> though both branches use the same grouping logic, Streams can’t detect that
>> they are the same. It just sees two group-bys and therefore introduces two
>> repartitions, with a separate downstream task for each.
>>
>> You might want to print out the topology description and visualize it
>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
>> whether the stores wind up in the same task or not.
>>
>> The visualization will also show you the names of the input topics for
>> those two partitions, which you can use in conjunction with the metadata
>> methods on your KafkaStreams instance to query for the location of the keys
>> in both stores.
>>
>> I suspect that with some tweaks you can re-write the topology to just
>> have one downstream task, if that’s what you prefer.
>>
>> By the way, I think you could propose to add an optimization to make the
>> groupBy behave the way you expected. If that’s interesting to you, let us
>> know and we can give you some pointers!
>>
>> I hope this helps,
>> John
>>
>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
>> > Hi All,
>> >
>> > I have a stream application that creates 2 branches.  Each branch
>> includes
>> > a state store where the status field of the kafka message determines the
>> > branch, and therefore the state store used:
>> >
>> > Status OPEN = State store name totals
>> >
>> > Status CLOSED = State store name records
>> >
>> >
>> >
>> > I’m seeing that the streams application is running on a pod; however I’m
>> > getting the exception:
>> >
>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
>> > store, records, may have migrated to another instance.
>> >
>> >
>> >
>> > If I physically access the pod and check the Rocksdb folders I do not
>> see
>> > the state store folder.  If I check the keys in the totals state store
>> on
>> > this pod, I can find the key in the records state store on another pod.
>> I
>> > had assumed that because the key of the events are the same, the same
>> > partition would be used for the two branches and therefore the same
>> keys in
>> > these two state store would be created on the same Kubernetes pod.
>> This is
>> > not an issue for the Kafka stream, but that assumption was used in the
>> way
>> > the state stores are read.  I assumed if I found the key in the 'totals'
>> > state store, the same key would be found on the same pod in the
>> 'records'
>> > state store.
>> >
>> >
>> >
>> > The questions I have are:
>> >
>> > 1) Is it expected that the state stores can hold the partition data on
>> > different pods, and is this unique to streams using branch?
>> >
>> > 2) Is there a way to know if the state store is on the pod to avoid
>> > handling this as an exception?
>> >
>> >
>> >
>> > Here is the topology of the stream in question:
>> >
>> > KStream[] branches =
>> stream
>> >
>> > .peek(receivingEventLogger)
>> >
>&

Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-11-28 Thread Pushkar Deole
Hi John,

I am not sure I understood it correctly, even with branching that uses a
different state store, the key of incoming event is still the same, so we
expect it to land in the local state store on the same pod.
e.g. an event with OPEN status, with key xyz came in and processed through
one branch and it is stored in state store 'totals', state maintained on
local state store on same pod
2nd event with OPEN status, with key xyz came in and again processed and
stored in 'totals'. State maintained on local state store on same pod

3rd event with CLOSED status, with key xyz came in and processed. The state
is stored in 'record' state store, it is expected to be stored in state
store on same pod.
Why it would go to some other pod?

On Wed, Nov 23, 2022 at 8:50 PM John Roesler  wrote:

> Hi Pushkar,
>
> Thanks for the question. I think that what’s happening is that, even
> though both branches use the same grouping logic, Streams can’t detect that
> they are the same. It just sees two group-bys and therefore introduces two
> repartitions, with a separate downstream task for each.
>
> You might want to print out the topology description and visualize it with
> https://zz85.github.io/kafka-streams-viz/ . That will show you whether
> the stores wind up in the same task or not.
>
> The visualization will also show you the names of the input topics for
> those two partitions, which you can use in conjunction with the metadata
> methods on your KafkaStreams instance to query for the location of the keys
> in both stores.
>
> I suspect that with some tweaks you can re-write the topology to just have
> one downstream task, if that’s what you prefer.
>
> By the way, I think you could propose to add an optimization to make the
> groupBy behave the way you expected. If that’s interesting to you, let us
> know and we can give you some pointers!
>
> I hope this helps,
> John
>
> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> > Hi All,
> >
> > I have a stream application that creates 2 branches.  Each branch
> includes
> > a state store where the status field of the kafka message determines the
> > branch, and therefore the state store used:
> >
> > Status OPEN = State store name totals
> >
> > Status CLOSED = State store name records
> >
> >
> >
> > I’m seeing that the streams application is running on a pod; however I’m
> > getting the exception:
> >
> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> > store, records, may have migrated to another instance.
> >
> >
> >
> > If I physically access the pod and check the Rocksdb folders I do not see
> > the state store folder.  If I check the keys in the totals state store on
> > this pod, I can find the key in the records state store on another pod. I
> > had assumed that because the key of the events are the same, the same
> > partition would be used for the two branches and therefore the same keys
> in
> > these two state store would be created on the same Kubernetes pod.  This
> is
> > not an issue for the Kafka stream, but that assumption was used in the
> way
> > the state stores are read.  I assumed if I found the key in the 'totals'
> > state store, the same key would be found on the same pod in the 'records'
> > state store.
> >
> >
> >
> > The questions I have are:
> >
> > 1) Is it expected that the state stores can hold the partition data on
> > different pods, and is this unique to streams using branch?
> >
> > 2) Is there a way to know if the state store is on the pod to avoid
> > handling this as an exception?
> >
> >
> >
> > Here is the topology of the stream in question:
> >
> > KStream[] branches =
> stream
> >
> > .peek(receivingEventLogger)
> >
> > .selectKey(keyMapper)
> >
> > .mapValues(totalsValueMapper)
> >
> > .filter(nullKeyValueEventFilter)
> >
> > .branch((k, v) -> (RecordStatus.CLOSED.name
> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
> >
> > || RecordStatus.LB_RDELETE.name
> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
> >
> > (k, v) -> true);
> >
> >
> >
> > // CLOSED and LB_RDELETE branch writes to records state store
> >
> > branches[0]
> >
> > .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
> >
> > .aggregate(totalsInitializer, totalsAggregator,
> > materializedRecords)
> &

same keys appearing in state stores on different pods when using branches in kafka streams

2022-11-23 Thread Pushkar Deole
Hi All,

I have a stream application that creates 2 branches.  Each branch includes
a state store where the status field of the kafka message determines the
branch, and therefore the state store used:

Status OPEN = State store name totals

Status CLOSED = State store name records



I’m seeing that the streams application is running on a pod; however I’m
getting the exception:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, records, may have migrated to another instance.



If I physically access the pod and check the Rocksdb folders I do not see
the state store folder.  If I check the keys in the totals state store on
this pod, I can find the key in the records state store on another pod. I
had assumed that because the key of the events are the same, the same
partition would be used for the two branches and therefore the same keys in
these two state store would be created on the same Kubernetes pod.  This is
not an issue for the Kafka stream, but that assumption was used in the way
the state stores are read.  I assumed if I found the key in the 'totals'
state store, the same key would be found on the same pod in the 'records'
state store.



The questions I have are:

1) Is it expected that the state stores can hold the partition data on
different pods, and is this unique to streams using branch?

2) Is there a way to know if the state store is on the pod to avoid
handling this as an exception?



Here is the topology of the stream in question:

KStream[] branches = stream

.peek(receivingEventLogger)

.selectKey(keyMapper)

.mapValues(totalsValueMapper)

.filter(nullKeyValueEventFilter)

.branch((k, v) -> (RecordStatus.CLOSED.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())

|| RecordStatus.LB_RDELETE.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())),

(k, v) -> true);



// CLOSED and LB_RDELETE branch writes to records state store

branches[0]

.groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

.aggregate(totalsInitializer, totalsAggregator,
materializedRecords)

.toStream()

.map(totalsInternalKeyValueMapper)

.filter(nullKeyStringValueEventFilter)

.to(loopbackTopic.name());



// DEFAULT branch writes to totals state store

branches[1]

.groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

.aggregate(totalsInitializer, totalsAggregator,
materializedTotals)

.toStream()

.flatMap(totalsKeyValueMapper)

.filter(nullKeyStringValueEventFilter)

.peek(sendingEventLogger)

.to(toTopic.name());


Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-20 Thread Pushkar Deole
Hi Luke,

Thanks for the details... so from explanation above, it seems that both of
these scenarios, I won't be able to avoid duplicates processing, which is
main purpose that I was looking to achieve

scenario 1: consumer shuts down, and doesn't commit offsets of
already polled and processed batch of records (since auto.commit enabled
which would commit on next poll which won't occur when closing consumer).
This would give rise duplicate processing of that batch when partition is
rebalanced to other consumer pod

scenario 2: CooperativeStickyAssignor keeps working on partition before
rebalancing which means again same thing i.e. consumer1 has polled and
processed some records which are not yet committed before rebalancing and
when partition moves over to next consumer, it can process those records
again

On Wed, Sep 21, 2022 at 7:32 AM Luke Chen  wrote:

> Hi
>
> 1. I was under impression, from documentation, that close method waits for
> 30 seconds to complete processing of any in-flight events and then commits
> offsets of last poll. Isn't that true? what does timeout of 30 seconds
> mean?
>
> -> 30 seconds timeout is to have a buffer for graceful closing, ex: commit
> offsets, leave groups,...
> It won't complete processing any in-flight "fetch" events during closing.
>
> 2. How does CoperativeStickyAssignor solve my problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll. 2. How does CoperativeStickyAssignor solve my
> problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll.
>
> -> You're right about the process.
>
> So how does CooperativeStickyAssignore help here to
> wait for process of 100 records and commit their offsets before moving the
> partition to new consumer? Looks like i am missing something
> Looks like i am missing something
>
> -> CooperativeStickyAssignore does the same thing to it, except it will
> keep all the partitions "during rebalancing".
> So, the issue is:
> In eagar protocol (ex: RangeAssignor)
> consumer prepare rebalancing -> commit offsets -> revoke all owned
> partitions -> rebalancing -> received new assignment -> start fetch data
> In cooperative protocol (ex: CooperativeStickyAssignore)
> consumer prepare rebalancing -> commit offsets (but no revoke) ->
> rebalancing -> received new assignment -> revoke partitions not owned
> anymore
>
> So you can see, in cooperative protocol, since it didn't revoke any
> partition before rebalancing, it might fetch more data after offset
> commits.
>
> Hope that's clear
> Luke
>
> On Tue, Sep 20, 2022 at 9:36 PM Pushkar Deole 
> wrote:
>
> > Thanks Luke..
> >
> > 1. I was under impression, from documentation, that close method waits
> for
> > 30 seconds to complete processing of any in-flight events and then
> commits
> > offsets of last poll. Isn't that true? what does timeout of 30 seconds
> > mean?
> >
> > 2. How does CoperativeStickyAssignor solve my problem when partitions
> move
> > out to newly added consumer pod. i.e. consumer1 has polled 100 records
> from
> > partition1 and is midway processing those i.e. 50 completed, 50 remaining
> > and new consumer is added so partition1 has to move to new consumer2.
> Since
> > auto.commit is enabled, offsets of all 100 polled records will be
> committed
> > only during next poll. So how does CooperativeStickyAssignore help here
> to
> > wait for process of 100 records and commit their offsets before moving
> the
> > partition to new consumer? Looks like i am missing something
> >
> > On Fri, Sep 16, 2022 at 7:59 AM Luke Chen  wrote:
> >
> > > Hi Pushkar,
> > >
> > > Here's the answer to your questions:
> > >
> > > > 1. During scale-down operation, I am adding a shutdown hook to the
> Java
> > > Runtime, and calling close on the consumer. As per kafka docs, close
> > > provides 30 sec to commit current offsets if auto.commit is enabled:
> so,
> > i
> > > assume that it will process the current batch of polled records within
> 30
> > > sec timeout

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-20 Thread Pushkar Deole
Thanks Luke..

1. I was under impression, from documentation, that close method waits for
30 seconds to complete processing of any in-flight events and then commits
offsets of last poll. Isn't that true? what does timeout of 30 seconds mean?

2. How does CoperativeStickyAssignor solve my problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll. So how does CooperativeStickyAssignore help here to
wait for process of 100 records and commit their offsets before moving the
partition to new consumer? Looks like i am missing something

On Fri, Sep 16, 2022 at 7:59 AM Luke Chen  wrote:

> Hi Pushkar,
>
> Here's the answer to your questions:
>
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> Runtime, and calling close on the consumer. As per kafka docs, close
> provides 30 sec to commit current offsets if auto.commit is enabled: so, i
> assume that it will process the current batch of polled records within 30
> sec timeout before committing offsets and then close the consumer. Is my
> understanding correct?
>
> No, close() method is only doing some cleanup and offset commit if needed.
> It won't care if the polled records are processed or not.
> So, to be clear, the 30 seconds is for consumer to do:
> (1) commit offset if auto.commit is enabled (2) leave consumer group (3)
> other cleanup
>
> > 2. During scale out operation, new pod (consumer) will be added to the
> consumer group, so partitions of existing consumers will be rebalanced to
> new consumer. In this case, I want to ensure that the current batch of
> records polled and being processed by the consumer is processed and offsets
> are committed before partition rebalance happens to new consumer.
> How can I ensure this with auto-commit enabled?
>
> It depends on which version of Kafka you're running, and which
> `partition.assignment.strategy` you are setting.
> In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate
> records during rebalance: KAFKA-14196
> <https://issues.apache.org/jira/browse/KAFKA-14196>
> So, assuming you're using default `partition.assignment.strategy` setting,
> and not in v3.2.1, we can ensure it will not have duplicated consumption.
> If you set the `partition.assignment.strategy` to
> cooperativeStickyAssignor, there's a bug that we're still working on:
> KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224>
>
> Thank you.
> Luke
>
> On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > I am hosting kafka consumers inside microservice hosted as kubernetes
> pods,
> > 3 consumers in a consumer group.
> > There is a requirement to add auto-scaling where there will be a single
> pod
> > which will be auto-scaled out or scaled-in based on the load on
> > microservice.
> > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> > scaled down to 2 or 1 pod.
> >
> > Currently, I am using enabled.auto.commit set to 'true' in the consumers
> > and during scale out or scale-in, i want to commit offset of polled and
> > processed records so duplicates won't occur.
> > I have narrowed the problem to 2 scenarios:
> >
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > Runtime, and calling close on the consumer. As per kafka docs, close
> > provides 30 sec to commit current offsets if auto.commit is enabled: so,
> i
> > assume that it will process the current batch of polled records within 30
> > sec timeout before committing offsets and then close the consumer. Is my
> > understanding correct?
> >
> > public void close()
> >
> > Close the consumer, waiting for up to the default timeout of 30 seconds
> for
> > any needed cleanup. If auto-commit is enabled, this will commit the
> current
> > offsets if possible within the default timeout. See close(Duration)
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> > >
> > for
> > details. Note that wakeup()
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> > >
> > cannot
> > be used to interrupt close.
> >
> > 2. During scale out operation, new pod (consumer) will be added to the
> > consumer group, so partitions of existing consumers will be rebalanced to
> > new consumer. In this case, I want to ensure that the current batch of
> > records polled and being processed by the consumer is processed and
> offsets
> > are committed before partition rebalance happens to new consumer.
> > How can I ensure this with auto-commit enabled?
> >
>


Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-14 Thread Pushkar Deole
Hi All,

I am hosting kafka consumers inside microservice hosted as kubernetes pods,
3 consumers in a consumer group.
There is a requirement to add auto-scaling where there will be a single pod
which will be auto-scaled out or scaled-in based on the load on
microservice.
So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
scaled down to 2 or 1 pod.

Currently, I am using enabled.auto.commit set to 'true' in the consumers
and during scale out or scale-in, i want to commit offset of polled and
processed records so duplicates won't occur.
I have narrowed the problem to 2 scenarios:

1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

public void close()

Close the consumer, waiting for up to the default timeout of 30 seconds for
any needed cleanup. If auto-commit is enabled, this will commit the current
offsets if possible within the default timeout. See close(Duration)

for
details. Note that wakeup()

cannot
be used to interrupt close.

2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?


Re: source topic partitions not assigned evenly to kafka stream threads

2022-06-16 Thread Pushkar Deole
@Matthias

Can you help with this since I remember having conversation with you in the
past on this topic wherein it was mentioned that partition assignment to
stream task logic might change in future releases

On Mon, Jun 13, 2022, 11:05 Pushkar Deole  wrote:

> Hi,
>
> I have a microservice hosting kafka streams application. I have 3
> instances of microservice hosted in 3 pods, each is having 2 kafka stream
> threads, thus total 6 stream threads as part of consumer group.
> There are 3 source topics: A, B, C each having 12, 6, 6 partitions
> respectively i.e. total 24 source topic partitions.
>
> Now, the issue I am facing is the distribution of source topic partitions
> among stream threads. Considering that 6 streams threads and overall 24
> topic partitions, each stream thread is assigned 4 partitions, so no issue
> there. However the main issue arises is that of partitions assigned to
> stream thread from each topic. So, sometimes, I am getting 4 partitions
> from Topic A assigned to stream thread 1, while other stream thread will
> get partitions from Topic B and C only.
>
> What I am expecting is, partitions from each topic will get evenly
> distributed among 6 threads, so each stream thread should get 2 partitions
> of topic A, and 1 partition of topic B and C each, thus making 4 partitions
> of input topics. This is required for proper load balancing since topic A
> carries a lot more event traffic than topic B and C due to which partitions
> of each topic should be distributed evenly across all stream threads.
>
> I am using kafka stream 2.5.1
> Is this the default behavior of kafka streams 2.5.1 client? Is there any
> configuration to change this behaviour?
> Is there any change in logic in later versions of streams that would
> guarantee the distribution the way I described above?
>


source topic partitions not assigned evenly to kafka stream threads

2022-06-12 Thread Pushkar Deole
Hi,

I have a microservice hosting kafka streams application. I have 3 instances
of microservice hosted in 3 pods, each is having 2 kafka stream threads,
thus total 6 stream threads as part of consumer group.
There are 3 source topics: A, B, C each having 12, 6, 6 partitions
respectively i.e. total 24 source topic partitions.

Now, the issue I am facing is the distribution of source topic partitions
among stream threads. Considering that 6 streams threads and overall 24
topic partitions, each stream thread is assigned 4 partitions, so no issue
there. However the main issue arises is that of partitions assigned to
stream thread from each topic. So, sometimes, I am getting 4 partitions
from Topic A assigned to stream thread 1, while other stream thread will
get partitions from Topic B and C only.

What I am expecting is, partitions from each topic will get evenly
distributed among 6 threads, so each stream thread should get 2 partitions
of topic A, and 1 partition of topic B and C each, thus making 4 partitions
of input topics. This is required for proper load balancing since topic A
carries a lot more event traffic than topic B and C due to which partitions
of each topic should be distributed evenly across all stream threads.

I am using kafka stream 2.5.1
Is this the default behavior of kafka streams 2.5.1 client? Is there any
configuration to change this behaviour?
Is there any change in logic in later versions of streams that would
guarantee the distribution the way I described above?


Re: kafka producer exception due to TimeoutException: Expiring records for topic 120000ms has passed since batch creation

2022-04-05 Thread Pushkar Deole
Liam,

This is set to default and we have not changed these configurations. So
from kafka docs, the default value of linger.ms for producer is 0

On Tue, Apr 5, 2022 at 4:42 AM Liam Clarke-Hutchinson 
wrote:

> Hi Pushkar,
>
> Could be a lot of things. What's your linger.ms configured for?
>
> Cheers,
>
> Liam
>
> On Tue, 5 Apr 2022 at 05:39, Pushkar Deole  wrote:
>
> > Hi All,
> >
> > We are intermittently seeing KafkaProducerException. The nested exception
> > is as below:
> >
> > org.springframework.kafka.core.KafkaProducerException: Failed to send;
> > nested exception is org.apache.kafka.common.errors.TimeoutException:
> > Expiring 10 record(s) for analytics.mpe.passthrough-0:12 ms has
> passed
> > since batch creation\n\tat
> > org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(
> > KafkaTemplate.java:602 <http://kafkatemplate.java:602/>)
> >
> > Kafka version is 2.5
> > Can someone give some ideas as to what would cause this and how can this
> be
> > resolved?
> >
>


kafka producer exception due to TimeoutException: Expiring records for topic 120000ms has passed since batch creation

2022-04-04 Thread Pushkar Deole
Hi All,

We are intermittently seeing KafkaProducerException. The nested exception
is as below:

org.springframework.kafka.core.KafkaProducerException: Failed to send;
nested exception is org.apache.kafka.common.errors.TimeoutException:
Expiring 10 record(s) for analytics.mpe.passthrough-0:12 ms has passed
since batch creation\n\tat
org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(
KafkaTemplate.java:602 )

Kafka version is 2.5
Can someone give some ideas as to what would cause this and how can this be
resolved?


if kafka producer client app crashes, does kafka server cleanup server side resources

2021-11-29 Thread Pushkar Deole
Hi All,

I am wondering what would happen if the producer app crashes without
calling producer.close, in this case would kafka server take care of
cleaning up resources allocated on kafka server/broker for those producers?
Or whether those resources be leaked on server side and How does kafka
server handle this?


Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-22 Thread Pushkar Deole
Dave,

i am not sure i get your point... it is not about lesser partitions, the
issue is about the duplicate hash caused by default partitioner for 2
different string, which might be landing the 2 different keys into same
partition

On Sun, Nov 21, 2021 at 9:33 PM Dave Klein  wrote:

> Another possibility, if you can pause processing, is to create a new topic
> with the higher number of partitions, then consume from the beginning of
> the old topic and produce to the new one. Then continue processing as
> normal and all events will be in the correct partitions.
>
> Regards,
> Dave
>
> > On Nov 21, 2021, at 7:38 AM, Pushkar Deole  wrote:
> >
> > Thanks Luke, I am sure this problem would have been faced by many others
> > before so would like to know if there are any existing custom algorithms
> > that can be reused,
> >
> > Note that we also have requirement to maintain key level ordering,  so
> the
> > custom partitioner should support that as well
> >
> >> On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:
> >>
> >> Hello Pushkar,
> >> Default distribution algorithm is by "hash(key) % partition_count", so
> >> there's possibility to have the uneven distribution you saw.
> >>
> >> Yes, there's a way to solve your problem: custom partitioner:
> >>
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
> >>
> >> You can check the partitioner javadoc here
> >> <
> >>
> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
> >>>
> >> for reference. You can see some examples from built-in partitioners, ex:
> >>
> >>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
> >> Basically, you want to focus on the "partition" method, to define your
> own
> >> algorithm to distribute the keys based on the events, ex: key-1 ->
> >> partition-1, key-2 -> partition-2... etc.
> >>
> >> Thank you.
> >> Luke
> >>
> >>
> >> On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> We are experiencing some uneven distribution of events across topic
> >>> partitions for a small set of unique keys: following are the details:
> >>>
> >>> 1. topic with 6 partitions
> >>> 2. 8 unique keys used to produce events onto the topic
> >>>
> >>> Used 'key' based partitioning while producing events onto the above
> topic
> >>> Observation: only 3 partitions were utilized for all the events
> >> pertaining
> >>> to those 8 unique keys.
> >>>
> >>> Any idea how can the load be even across partitions while using key
> based
> >>> partitioning strategy? Any help would be greatly appreciated.
> >>>
> >>> Note: we cannot use round robin since key level ordering matters for us
> >>>
> >>
>
>


Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-21 Thread Pushkar Deole
Thanks Luke, I am sure this problem would have been faced by many others
before so would like to know if there are any existing custom algorithms
that can be reused,

Note that we also have requirement to maintain key level ordering,  so the
custom partitioner should support that as well

On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:

> Hello Pushkar,
> Default distribution algorithm is by "hash(key) % partition_count", so
> there's possibility to have the uneven distribution you saw.
>
> Yes, there's a way to solve your problem: custom partitioner:
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
>
> You can check the partitioner javadoc here
> <
> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
> >
> for reference. You can see some examples from built-in partitioners, ex:
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
> Basically, you want to focus on the "partition" method, to define your own
> algorithm to distribute the keys based on the events, ex: key-1 ->
> partition-1, key-2 -> partition-2... etc.
>
> Thank you.
> Luke
>
>
> On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > We are experiencing some uneven distribution of events across topic
> > partitions for a small set of unique keys: following are the details:
> >
> > 1. topic with 6 partitions
> > 2. 8 unique keys used to produce events onto the topic
> >
> > Used 'key' based partitioning while producing events onto the above topic
> > Observation: only 3 partitions were utilized for all the events
> pertaining
> > to those 8 unique keys.
> >
> > Any idea how can the load be even across partitions while using key based
> > partitioning strategy? Any help would be greatly appreciated.
> >
> > Note: we cannot use round robin since key level ordering matters for us
> >
>


uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-19 Thread Pushkar Deole
Hi All,

We are experiencing some uneven distribution of events across topic
partitions for a small set of unique keys: following are the details:

1. topic with 6 partitions
2. 8 unique keys used to produce events onto the topic

Used 'key' based partitioning while producing events onto the above topic
Observation: only 3 partitions were utilized for all the events pertaining
to those 8 unique keys.

Any idea how can the load be even across partitions while using key based
partitioning strategy? Any help would be greatly appreciated.

Note: we cannot use round robin since key level ordering matters for us


Producer Timeout issue in kafka streams task

2021-10-31 Thread Pushkar Deole
Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?

{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch creation\n"}


does kafka provide REST API for producing events to topic?

2021-10-12 Thread Pushkar Deole
Hi All,

I am trying to load test our application that is a consumer of a kafka
topic.
I want to load test the application wherein my test tool (jmeter script) is
making a connection to kafka broker and then producing events onto topic
through producer API. This requires that the test client has connectivity
to the kafka broker through some VNET peering in cloud.

However I would like to know if kafka also provides any REST API which can
be accessed by the test client (rather than making a TCP connection to
broker) and then invoking those REST API to produce events onto topics?


Re: kafka streams commit.interval.ms for at-least-once too high

2021-10-06 Thread Pushkar Deole
Matthias,

Good to hear on this part that kafka streams handle this internally :  "If
a rebalance/shutdown is triggered, Kafka Streams will stop processing new
records and just finish processing all in-flight records. Afterwards, a
commit happens right away for all fully processed records."

Since regular consumer-producer doesn't support this I guess, even in case
of normal shutdown there could be duplicates. Is that correct or kafka has
support for normal consumer-producer to handle in-flight processing and
commit those offsets before a rebalance/shutdown occurs?

On Wed, Oct 6, 2021 at 12:22 AM Matthias J. Sax  wrote:

> > - By producer config, i hope you mean batching and other settings that
> will
> > hold off producing of events. Correct me if i'm wrong
>
> Correct.
>
> > - Not sure what you mean by throughput here, which configuration would
> > dictate that?
>
> I referred to input topic throughput. If you have higher/lower
> throughput you might get data quicker/later depending on your producer
> configs.
>
> > - Do you mean here that the kafka streams internally handles waiting on
> > processing and offset commits of events that are already consumed and
> being
> > processed for streams instance?
>
> If a rebalance/shutdown is triggered, Kafka Streams will stop processing
> new records and just finish processing all in-flight records.
> Afterwards, a commit happens right away for all fully processed records.
>
>
> -Matthias
>
>
> On 10/5/21 8:35 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > On your response "For at-least-once, you would still get output
> > continuously, depending on throughput and producer configs"
> > - Not sure what you mean by throughput here, which configuration would
> > dictate that?
> > - By producer config, i hope you mean batching and other settings that
> will
> > hold off producing of events. Correct me if i'm wrong
> >
> > On your response "For regular rebalances/restarts, a longer commit
> interval
> > has no impact because offsets would be committed right away"
> > - Do you mean here that the kafka streams internally handles waiting on
> > processing and offset commits of events that are already consumed and
> being
> > processed for streams instance?
> >
> > On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax 
> wrote:
> >
> >> The main motivation for a shorter commit interval for EOS is
> >> end-to-end-latency. A Topology could consist of multiple sub-topologies
> >> and the end-to-end-latency for the EOS case is roughly commit-interval
> >> times number-of-subtopologies.
> >>
> >> For regular rebalances/restarts, a longer commit interval has no impact,
> >> because for a regular rebalance/restart, offsets would be committed
> >> right away to guarantee a clean hand-off. Only in case of failure, a
> >> longer commit interval can lead to larger amount of duplicates (of
> >> course only for at-least-once guarantees).
> >>
> >> For at-least-once, you would still get output continuously, depending on
> >> throughput and producer configs. Only offsets are committed each 30
> >> seconds by default. This continuous output is also the reason why there
> >> is not latency impact for at-least-once using a longer commit interval.
> >>
> >> Beside an impact on latency, there is also a throughput impact. Using a
> >> longer commit interval provides higher throughput.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 10/4/21 7:31 AM, Pushkar Deole wrote:
> >>> Hi All,
> >>>
> >>> I am looking into the commit.interval.ms in kafka streams which says
> >> that
> >>> it is the time interval at which streams would commit offsets to source
> >>> topics.
> >>> However for exactly-once guarantee, default of this time is 100ms
> whereas
> >>> for at-least-once it is 3ms (i.e. 30sec)
> >>> Why is there such a huge time difference between the 2 guarantees and
> >> what
> >>> does it mean to have this interval as high as 30 seconds, does it also
> >>> cause more probability of higher no. of duplicates in case of
> application
> >>> restarts or partition rebalance ?
> >>> Does it mean that the streams application would also publish events to
> >>> destination topic only at this interval which means delay in publishing
> >>> events to destinations topic ?
> >>>
> >>
> >
>


Re: kafka streams commit.interval.ms for at-least-once too high

2021-10-05 Thread Pushkar Deole
Matthias,

On your response "For at-least-once, you would still get output
continuously, depending on throughput and producer configs"
- Not sure what you mean by throughput here, which configuration would
dictate that?
- By producer config, i hope you mean batching and other settings that will
hold off producing of events. Correct me if i'm wrong

On your response "For regular rebalances/restarts, a longer commit interval
has no impact because offsets would be committed right away"
- Do you mean here that the kafka streams internally handles waiting on
processing and offset commits of events that are already consumed and being
processed for streams instance?

On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax  wrote:

> The main motivation for a shorter commit interval for EOS is
> end-to-end-latency. A Topology could consist of multiple sub-topologies
> and the end-to-end-latency for the EOS case is roughly commit-interval
> times number-of-subtopologies.
>
> For regular rebalances/restarts, a longer commit interval has no impact,
> because for a regular rebalance/restart, offsets would be committed
> right away to guarantee a clean hand-off. Only in case of failure, a
> longer commit interval can lead to larger amount of duplicates (of
> course only for at-least-once guarantees).
>
> For at-least-once, you would still get output continuously, depending on
> throughput and producer configs. Only offsets are committed each 30
> seconds by default. This continuous output is also the reason why there
> is not latency impact for at-least-once using a longer commit interval.
>
> Beside an impact on latency, there is also a throughput impact. Using a
> longer commit interval provides higher throughput.
>
>
> -Matthias
>
>
> On 10/4/21 7:31 AM, Pushkar Deole wrote:
> > Hi All,
> >
> > I am looking into the commit.interval.ms in kafka streams which says
> that
> > it is the time interval at which streams would commit offsets to source
> > topics.
> > However for exactly-once guarantee, default of this time is 100ms whereas
> > for at-least-once it is 3ms (i.e. 30sec)
> > Why is there such a huge time difference between the 2 guarantees and
> what
> > does it mean to have this interval as high as 30 seconds, does it also
> > cause more probability of higher no. of duplicates in case of application
> > restarts or partition rebalance ?
> > Does it mean that the streams application would also publish events to
> > destination topic only at this interval which means delay in publishing
> > events to destinations topic ?
> >
>


kafka streams commit.interval.ms for at-least-once too high

2021-10-04 Thread Pushkar Deole
Hi All,

I am looking into the commit.interval.ms in kafka streams which says that
it is the time interval at which streams would commit offsets to source
topics.
However for exactly-once guarantee, default of this time is 100ms whereas
for at-least-once it is 3ms (i.e. 30sec)
Why is there such a huge time difference between the 2 guarantees and what
does it mean to have this interval as high as 30 seconds, does it also
cause more probability of higher no. of duplicates in case of application
restarts or partition rebalance ?
Does it mean that the streams application would also publish events to
destination topic only at this interval which means delay in publishing
events to destinations topic ?


Re: Redis as state store

2021-07-19 Thread Pushkar Deole
Sophie,

In continuation to the discussion above, now that we are using redis for
storing state in our application which breaks the EOS of kafka streams, I
do have one question and whether we can do some work around here: in the
scenario :

1. Event consumed from source
2. Event processed and state stored in redis
3. Before event is sent to sink topic, the node goes down
4. Partition get rebalanced and another node consumes/processes same event
again

Even though redis state store could be inconsistent with rest of streams
infrastructure like source/sink topics, can we still get exactly-once
guarantee across source/sink topic where the offset commit to source and
send to sink would all happen atomically or nothing would happen. If this
is still guaranteed by kafka streams then we can think of applying some
workaround while reading state from redis e.g. we could try and apply some
conditional logic when fetching the state from redis based on unique id of
event i.e. along with state, we will store that state belong with
processing of event based on unique, so when that same event is consumed
back from source, the processor will use state in redis as current state
and not as previous state since the event has been processed and state
stored in redis already

On Wed, Mar 24, 2021 at 4:15 AM Sophie Blee-Goldman
 wrote:

> >
> > it seems to me that local state stores might prove more performant in the
> > event of above
> > mentioned failures of streams application
>
> Note, the scenario I was describing with deleting all record from the
> remote store in event of
> failure is *not* what we do today -- I was describing a potential solution
> to the problem of EOS
> with remote storage. As of the current code, local storage is actually the
> one which would wipe
> out the state stores and need to restore from scratch. Granted, I'm
> actually not sure how this
> would be handled with EOS -- most likely we would still end up restoring
> from the changelog,
> but we just wouldn't wipe out the state stores.
>
> But the point here is that EOS with remote storage is broken -- if your use
> case requires
> exactly-once semantics, you *must* use local state stores. If you don't
> care about occasionally
> processing a record twice, or consider steady performance more important
> than this, then just
> don't use EOS in the first place.
>
> Note: Streams doesn't provide an RPC layer itself, it's up to you to
> implement, but yes Streams
> does support query state stores through RPC. But this feature (Interactive
> Queries/IQ) is for
> querying the state from outside the Kafka Streams application, it's not
> intended to be used
> from within a processor. In general remote calls inside a processor are
> somewhat discouraged,
> since it's difficult to handle failed calls. And remote calls will also
> always break EOS, if that
> matters to you.
>
> All that said, I guess you *could* use IQ to query the state stores on
> another instance from within
> a processor, but I highly doubt this would solve the problem you're facing:
> if the GlobalKTable
> isn't syncing fast enough for you then I wouldn't expect updates in another
> instance to be
> available when you need them. This depends on what you're trying to do and
> the flow of data
> on the input topics, but I get the sense there's probably a better way to
> do what you're trying to
> achieve. I'm not sure why you would want a GlobalKTable for example -- why
> not just have
> one application write the results to an output topic, and then let the
> downstream application
> read in that topic as a table? You should be able to have full control over
> things that way
>
> On Sun, Mar 21, 2021 at 5:58 AM Pushkar Deole 
> wrote:
>
> > Thanks Sophie... i was just thinking what would be a good options for us,
> > whether local state stores or redis state store, and it seems to me that
> > local state stores might prove more performant in the event of above
> > mentioned failures of streams application.
> >
> > The main reason we are thinking of moving to redis state stores is
> because:
> > we want each application to have access to state saved by other stream
> > application instance. We tried to use a GlobalKTable backed by topic and
> > each instance would save to that topic which is then synchronized to
> > GlobalKTable in each application instance. However making GlobalKTable in
> > each application instance took around 100ms and before that time the next
> > event might need to get processed by application instance in which case
> it
> > did not have proper state.
> >
> > I was also looking at some options today available with local state store
> > and came across that kafka also provides an RPC

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Hi Lerh Chuan Low,

MAny thanks for your response. I get it now, that it provides exactly-once
semantics i.e it looks to user that it is processed exactly once.
Also, i am clear on the aspect about read_committed level so the
uncommitted transaction and hence uncommitted send won't be visible to
consumers.

However one last query i have is how to make sure that as part of the same
transaction, i am also sending and also committing offsets. Which API
should i look at: is this correct API :
KafkaProducer.
sendOffsetsToTransaction

On Fri, Jul 16, 2021 at 9:57 PM Lerh Chuan Low  wrote:

> Pushkar,
>
> My understanding is you can easily turn it on by using Kafka streams as
> Chris mentioned. Otherwise you'd have to do it yourself - I don't think you
> can get exactly once processing, but what you can do (which is also what
> Kafka streams does) is exactly once schematics (You won't be able to get
> every message processed exactly once in the system, but they could look
> like they had been processed exactly once), The final piece of the puzzle
> besides using idempotent producers and transactions is to set consumers of
> the downstream topic to *read_committed: true*. So in your example the
> messages would still have made it to the destination topic, however because
> the transaction has not yet been completed, the downstream consumer would
> ignore them.
>
> You can still only do exactly once processing up to the boundaries of
> Kafka, that said. Wherever Kafka terminates you'd have to code it yourself.
>
>
>
> On Sat, Jul 17, 2021 at 2:01 AM Pushkar Deole 
> wrote:
>
> > Chris,
> >
> > I am not sure how this solves the problem scenario that we are
> experiencing
> > in customer environment: the scenario is:
> > 1. application consumed record and processed it
> > 2. the processed record is produced on destination topic and ack is
> > received
> > 3. Before committing offset back to consumed topic, the application pod
> > crashed or shut down by kubernetes or shut down due to some other issue
> >
> > On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen
>  > >
> > wrote:
> >
> > > It is not possible out of the box, it is something you’ll have to write
> > > yourself. Would the following work?
> > >
> > > Consume -> Produce to primary topic-> get success ack back -> commit
> the
> > > consume
> > >
> > > Else if ack fails, produce to dead letter, then commit upon success
> > >
> > > Else if dead letter ack fails, exit (and thus don’t commit)
> > >
> > > Does that help? Someone please feel free to slap my hand but seems
> legit
> > to
> > > me ;)
> > >
> > > Chris
> > >
> > >
> > >
> > > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
> > wrote:
> > >
> > > > Thanks Chris for the response!
> > > >
> > > > The current application is quite evolved and currently using
> > > >
> > > > consumer-producer model described above and we need to fix some bugs
> > soon
> > > >
> > > > for a customer. So, moving to kafka streams seems bigger work. That's
> > why
> > > >
> > > > looking at work around if same thing can be achieved with current
> model
> > > >
> > > > using transactions that span across consumer offset commits and
> > producer
> > > >
> > > > send.
> > > >
> > > >
> > > >
> > > > We have made the producer idempotent and turned on transactions.
> > > >
> > > > However want to make offset commit to consumer and send from producer
> > to
> > > be
> > > >
> > > > atomic? Is that possible?
> > > >
> > > >
> > > >
> > > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
> > >  > > > >
> > > >
> > > > wrote:
> > > >
> > > >
> > > >
> > > > > Pushkar, in kafka development for customer consumer/producer you
> > handle
> > > > it.
> > > >
> > > > > However you can ensure the process stops (or sends message to dead
> > > > letter)
> > > >
> > > > > before manually committing the consumer offset. On the produce side
> > you
> > > > can
> > > >
> > > > > turn on idempotence or transactions. But unless you are using
> > Streams,
> > > > you
> > > >
> > > > > chain those together yoursef. Woul

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Chris,

I am not sure how this solves the problem scenario that we are experiencing
in customer environment: the scenario is:
1. application consumed record and processed it
2. the processed record is produced on destination topic and ack is received
3. Before committing offset back to consumed topic, the application pod
crashed or shut down by kubernetes or shut down due to some other issue

On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen 
wrote:

> It is not possible out of the box, it is something you’ll have to write
> yourself. Would the following work?
>
> Consume -> Produce to primary topic-> get success ack back -> commit the
> consume
>
> Else if ack fails, produce to dead letter, then commit upon success
>
> Else if dead letter ack fails, exit (and thus don’t commit)
>
> Does that help? Someone please feel free to slap my hand but seems legit to
> me ;)
>
> Chris
>
>
>
> On Fri, Jul 16, 2021 at 10:48 Pushkar Deole  wrote:
>
> > Thanks Chris for the response!
> >
> > The current application is quite evolved and currently using
> >
> > consumer-producer model described above and we need to fix some bugs soon
> >
> > for a customer. So, moving to kafka streams seems bigger work. That's why
> >
> > looking at work around if same thing can be achieved with current model
> >
> > using transactions that span across consumer offset commits and producer
> >
> > send.
> >
> >
> >
> > We have made the producer idempotent and turned on transactions.
> >
> > However want to make offset commit to consumer and send from producer to
> be
> >
> > atomic? Is that possible?
> >
> >
> >
> > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
>  > >
> >
> > wrote:
> >
> >
> >
> > > Pushkar, in kafka development for customer consumer/producer you handle
> > it.
> >
> > > However you can ensure the process stops (or sends message to dead
> > letter)
> >
> > > before manually committing the consumer offset. On the produce side you
> > can
> >
> > > turn on idempotence or transactions. But unless you are using Streams,
> > you
> >
> > > chain those together yoursef. Would kafka streams work for the
> operation
> >
> > > you’re looking to do?
> >
> > >
> >
> > > Best,
> >
> > > Chris
> >
> > >
> >
> > > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
> > wrote:
> >
> > >
> >
> > > > Hi All,
> >
> > > >
> >
> > > >
> >
> > > >
> >
> > > > I am using a normal kafka consumer-producer in my microservice, with
> a
> >
> > > >
> >
> > > > simple model of consume from source topic -> process the record ->
> >
> > > produce
> >
> > > >
> >
> > > > on destination topic.
> >
> > > >
> >
> > > > I am mainly looking for exactly-once guarantee  wherein the offset
> > commit
> >
> > > >
> >
> > > > to consumed topic and produce on destination topic would both happen
> >
> > > >
> >
> > > > atomically or none of them would happen.
> >
> > > >
> >
> > > >
> >
> > > >
> >
> > > > In case of failures of service instance, if consumer has consumed,
> >
> > > >
> >
> > > > processed record and produced on destination topic but offset not yet
> >
> > > >
> >
> > > > committed back to source topic then produce should also not happen on
> >
> > > >
> >
> > > > destination topic.
> >
> > > >
> >
> > > > Is this behavior i.e. exactly-once, across consumers and producers,
> >
> > > >
> >
> > > > possible with transactional support in kafka?
> >
> > > >
> >
> > > > --
> >
> > >
> >
> > >
> >
> > > [image: Confluent] <https://www.confluent.io>
> >
> > > Chris Larsen
> >
> > > Sr Solutions Engineer
> >
> > > +1 847 274 3735 <+1+847+274+3735>
> >
> > > Follow us: [image: Blog]
> >
> > > <
> >
> > >
> >
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >
> > > >[image:
> >
> > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> >
> > > <https://www.linkedin.com/in/chrislarsen/>
> >
> > >
> >
> > --
>
>
> [image: Confluent] <https://www.confluent.io>
> Chris Larsen
> Sr Solutions Engineer
> +1 847 274 3735 <+1+847+274+3735>
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/in/chrislarsen/>
>


Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Thanks Chris for the response!
The current application is quite evolved and currently using
consumer-producer model described above and we need to fix some bugs soon
for a customer. So, moving to kafka streams seems bigger work. That's why
looking at work around if same thing can be achieved with current model
using transactions that span across consumer offset commits and producer
send.

We have made the producer idempotent and turned on transactions.
However want to make offset commit to consumer and send from producer to be
atomic? Is that possible?

On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen 
wrote:

> Pushkar, in kafka development for customer consumer/producer you handle it.
> However you can ensure the process stops (or sends message to dead letter)
> before manually committing the consumer offset. On the produce side you can
> turn on idempotence or transactions. But unless you are using Streams, you
> chain those together yoursef. Would kafka streams work for the operation
> you’re looking to do?
>
> Best,
> Chris
>
> On Fri, Jul 16, 2021 at 08:30 Pushkar Deole  wrote:
>
> > Hi All,
> >
> >
> >
> > I am using a normal kafka consumer-producer in my microservice, with a
> >
> > simple model of consume from source topic -> process the record ->
> produce
> >
> > on destination topic.
> >
> > I am mainly looking for exactly-once guarantee  wherein the offset commit
> >
> > to consumed topic and produce on destination topic would both happen
> >
> > atomically or none of them would happen.
> >
> >
> >
> > In case of failures of service instance, if consumer has consumed,
> >
> > processed record and produced on destination topic but offset not yet
> >
> > committed back to source topic then produce should also not happen on
> >
> > destination topic.
> >
> > Is this behavior i.e. exactly-once, across consumers and producers,
> >
> > possible with transactional support in kafka?
> >
> > --
>
>
> [image: Confluent] <https://www.confluent.io>
> Chris Larsen
> Sr Solutions Engineer
> +1 847 274 3735 <+1+847+274+3735>
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/in/chrislarsen/>
>


Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Hi All,

I am using a normal kafka consumer-producer in my microservice, with a
simple model of consume from source topic -> process the record -> produce
on destination topic.
I am mainly looking for exactly-once guarantee  wherein the offset commit
to consumed topic and produce on destination topic would both happen
atomically or none of them would happen.

In case of failures of service instance, if consumer has consumed,
processed record and produced on destination topic but offset not yet
committed back to source topic then produce should also not happen on
destination topic.
Is this behavior i.e. exactly-once, across consumers and producers,
possible with transactional support in kafka?


Re: Confluent's parallel consumer

2021-07-15 Thread Pushkar Deole
It is consumer client node that has received events and is processing
those...

On Thu, Jul 15, 2021 at 8:49 PM Israel Ekpo  wrote:

> Hi Pushkar,
>
> When you use the term “node/instance” are you referring to the Kafka
> Brokers or the consuming clients that are retrieving events from the
> broker?
>
> Please could you elaborate/clarify?
>
> On Thu, Jul 15, 2021 at 10:00 AM Pushkar Deole 
> wrote:
>
> > Well... with key-level ordering, i am mainly concerned about event loss,
> if
> > any, in below mentioned scenario:
> >
> > 1. since event1 with key1 and event2 with key2 are both part of the same
> > partition1
> > 2. key1 event has offset 30 while key2 has offset 40
> > 3. key2 is processed by background thread and offset committed which is
> 40
> > 4. before key1 gets processed by background thread, the instance/node
> goes
> > down
> > 5. partition1 gets rebalanced to node2 and start processing ahead of
> offset
> > 40, thus losing key1
> >
> >
> >
> > On Thu, Jul 15, 2021 at 7:18 PM Israel Ekpo 
> wrote:
> >
> > > Hi Pushkar,
> > >
> > > If you are selecting key-based ordering, you should not be concerned
> > about
> > > the other keys from the same partitions being committed first
> > >
> > > If that is a concern for your use cases then you should go with
> partition
> > > based ordering to ensure that the events are processed in the sequence
> > they
> > > are picked up from the topic partition.
> > >
> > > For commit mode, you have the asynchronous, synchronous and
> transactional
> > > modes. I think if you are concerned with the order of commits you
> should
> > > look into the last two modes.
> > >
> > > My recommendation would be to go with the partition based ordering with
> > > synchronous commits to start with.
> > >
> > >
> > >
> > > On Thu, Jul 15, 2021 at 7:36 AM Pushkar Deole 
> > > wrote:
> > >
> > > > Hi All, and Antony (author of below article)
> > > >
> > > > i came across this article which seemed interesting: Introducing
> > > > Confluent’s Parallel Consumer Message Processing Client
> > > > <
> > > >
> > >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > > > >
> > > >
> > > > I would like to use the key-level ordering strategy mentioned in the
> > > > article to scale my consumers, however I would like to check how the
> > > offset
> > > > commits are handled in this strategy
> > > > e.g. on partition 1, key1 has offsets 20 and 30 respectively and on
> the
> > > > same partition key2 has offset 40. With key-level ordering model,
> key2
> > > will
> > > > be processed by a different thread in background and might gets
> > processed
> > > > before events related to key1, in this case offset for key2 will be
> > > > committed before key1 gets processed ? How is this handled?
> > > >
> > >
> >
>


Re: Confluent's parallel consumer

2021-07-15 Thread Pushkar Deole
Well... with key-level ordering, i am mainly concerned about event loss, if
any, in below mentioned scenario:

1. since event1 with key1 and event2 with key2 are both part of the same
partition1
2. key1 event has offset 30 while key2 has offset 40
3. key2 is processed by background thread and offset committed which is 40
4. before key1 gets processed by background thread, the instance/node goes
down
5. partition1 gets rebalanced to node2 and start processing ahead of offset
40, thus losing key1



On Thu, Jul 15, 2021 at 7:18 PM Israel Ekpo  wrote:

> Hi Pushkar,
>
> If you are selecting key-based ordering, you should not be concerned about
> the other keys from the same partitions being committed first
>
> If that is a concern for your use cases then you should go with partition
> based ordering to ensure that the events are processed in the sequence they
> are picked up from the topic partition.
>
> For commit mode, you have the asynchronous, synchronous and transactional
> modes. I think if you are concerned with the order of commits you should
> look into the last two modes.
>
> My recommendation would be to go with the partition based ordering with
> synchronous commits to start with.
>
>
>
> On Thu, Jul 15, 2021 at 7:36 AM Pushkar Deole 
> wrote:
>
> > Hi All, and Antony (author of below article)
> >
> > i came across this article which seemed interesting: Introducing
> > Confluent’s Parallel Consumer Message Processing Client
> > <
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > >
> >
> > I would like to use the key-level ordering strategy mentioned in the
> > article to scale my consumers, however I would like to check how the
> offset
> > commits are handled in this strategy
> > e.g. on partition 1, key1 has offsets 20 and 30 respectively and on the
> > same partition key2 has offset 40. With key-level ordering model, key2
> will
> > be processed by a different thread in background and might gets processed
> > before events related to key1, in this case offset for key2 will be
> > committed before key1 gets processed ? How is this handled?
> >
>


Confluent's parallel consumer

2021-07-15 Thread Pushkar Deole
Hi All, and Antony (author of below article)

i came across this article which seemed interesting: Introducing
Confluent’s Parallel Consumer Message Processing Client


I would like to use the key-level ordering strategy mentioned in the
article to scale my consumers, however I would like to check how the offset
commits are handled in this strategy
e.g. on partition 1, key1 has offsets 20 and 30 respectively and on the
same partition key2 has offset 40. With key-level ordering model, key2 will
be processed by a different thread in background and might gets processed
before events related to key1, in this case offset for key2 will be
committed before key1 gets processed ? How is this handled?


Re: does kafka streams guarantee EOS with external producer used in application?

2021-07-14 Thread Pushkar Deole
That's exactly what my question was: since there is an external producer in
the application without a sink node in topology, how will the streams know
that task is completed before committing the offset or it will not know at
all ?

Second question is: can there be multiple sink nodes if the record is to be
produced on different topics based on some conditions (e.g. just giving a
simplified example:  if record contains field A then produce on topic A,
field B then produce on Topic B etc.)

On Thu, Jul 15, 2021 at 4:29 AM Matthias J. Sax  wrote:

> Yes, if you use async writes, it could lead to data loss in case if
> failure as offsets could have been committed before the write succeeded.
> Only sync writes guard you from data loss.
>
> Note though that in Kafka Streams there is not manual commit anyway.
> Commits happen based on `commit.interval.ms` config. Calling
> `context.commit()` only schedules an earlier commit, but after the call
> returned, no commit happened yet (just a request to commit asap was
> registered).
>
>
> -Matthias
>
> On 7/14/21 12:00 AM, Pushkar Deole wrote:
> > If async-writes are used either with manual or auto commit, where the
> > record is sent for async write and consumer thread commits the offset
> > immediately, however async write fails,
> >
> > this may cause the loss of event ?
> >
> > On Wed, Jul 14, 2021 at 12:20 AM Matthias J. Sax 
> wrote:
> >
> >> If you want to use EOS, you cannot use your own producer, but you need
> >> to use a "sink node" (via `addSink()` or `to()`).
> >>
> >> For at-least-once, if you use sync-writes, you should still get
> >> at-least-once guarantees.
> >>
> >> -Matthias
> >>
> >> On 7/9/21 4:12 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> Do you have any inputs on above queries?
> >>>
> >>> On Wed, Jun 30, 2021 at 7:15 PM Pushkar Deole 
> >> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Our application uses kafka streams that reads from a source topic,
> does
> >>>> processing on records and produces processed record on destination
> topic
> >>>> through the use of external producer i.e. the producer created via
> kafka
> >>>> producer API.
> >>>>
> >>>> Does this model still guarantee exactly once semantic or it won't?
> >>>>
> >>>> Currently we are using at_least_once, however the question is how
> >> streams
> >>>> handles offset commits here?
> >>>> Though the event is produced using synchronous API, could there be
> >>>> possibility of event loss in case streams commit offset before
> external
> >>>> producer hasn't produced event on destination topic yet?
> >>>>
> >>>
> >>
> >
>


Re: does kafka streams guarantee EOS with external producer used in application?

2021-07-14 Thread Pushkar Deole
If async-writes are used either with manual or auto commit, where the
record is sent for async write and consumer thread commits the offset
immediately, however async write fails,

this may cause the loss of event ?

On Wed, Jul 14, 2021 at 12:20 AM Matthias J. Sax  wrote:

> If you want to use EOS, you cannot use your own producer, but you need
> to use a "sink node" (via `addSink()` or `to()`).
>
> For at-least-once, if you use sync-writes, you should still get
> at-least-once guarantees.
>
> -Matthias
>
> On 7/9/21 4:12 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > Do you have any inputs on above queries?
> >
> > On Wed, Jun 30, 2021 at 7:15 PM Pushkar Deole 
> wrote:
> >
> >> Hi,
> >>
> >> Our application uses kafka streams that reads from a source topic, does
> >> processing on records and produces processed record on destination topic
> >> through the use of external producer i.e. the producer created via kafka
> >> producer API.
> >>
> >> Does this model still guarantee exactly once semantic or it won't?
> >>
> >> Currently we are using at_least_once, however the question is how
> streams
> >> handles offset commits here?
> >> Though the event is produced using synchronous API, could there be
> >> possibility of event loss in case streams commit offset before external
> >> producer hasn't produced event on destination topic yet?
> >>
> >
>


Re: does kafka streams guarantee EOS with external producer used in application?

2021-07-09 Thread Pushkar Deole
Matthias,

Do you have any inputs on above queries?

On Wed, Jun 30, 2021 at 7:15 PM Pushkar Deole  wrote:

> Hi,
>
> Our application uses kafka streams that reads from a source topic, does
> processing on records and produces processed record on destination topic
> through the use of external producer i.e. the producer created via kafka
> producer API.
>
> Does this model still guarantee exactly once semantic or it won't?
>
> Currently we are using at_least_once, however the question is how streams
> handles offset commits here?
> Though the event is produced using synchronous API, could there be
> possibility of event loss in case streams commit offset before external
> producer hasn't produced event on destination topic yet?
>


does kafka streams guarantee EOS with external producer used in application?

2021-06-30 Thread Pushkar Deole
Hi,

Our application uses kafka streams that reads from a source topic, does
processing on records and produces processed record on destination topic
through the use of external producer i.e. the producer created via kafka
producer API.

Does this model still guarantee exactly once semantic or it won't?

Currently we are using at_least_once, however the question is how streams
handles offset commits here?
Though the event is produced using synchronous API, could there be
possibility of event loss in case streams commit offset before external
producer hasn't produced event on destination topic yet?


does consumer thread wait for producer to return (synchronous) in normal consume-process-produce topology? And how it is handled in streams?

2021-05-27 Thread Pushkar Deole
Hi,

I am trying to understand few things:

in a normal consumer-process-produce topology, consumer is polling records,
then process each and then gives to producer to produce on destination
topic. In this case,
is the 'produce' a synchronous call i.e does it happen in the same consumer
thread or produce takes place in a background producer thread
asynchronously?

If asynchronous, then how can consumer commit offset before produce
happened successfully?
If synchronous, then consumer thread gets held till produce happens,
possibly increasing consumer lag?


Re: does kafka support reducing topic partitions on the fly?

2021-05-19 Thread Pushkar Deole
Thanks... came across KIP however it is still under discussion:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-694%3A+Support+Reducing+Partitions+for+Topics

On Wed, May 19, 2021 at 8:30 PM Andrew Grant 
wrote:

> Hi Pushkar,
> I don't think so. Per https://kafka.apache.org/documentation/, "Kafka does
> not currently support reducing the number of partitions for a topic.".
> Andrew
>
> On Wed, May 19, 2021 at 10:17 AM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > I have a question around reducing topic partitions on kafka. Currently,
> all
> > topics are 6 partitions. I want to reduce it to 3 for some of the topics.
> > Is this supported by kafka without deleting the topic?
> >
>
>
> --
> Andrew Grant
> 8054482621
>


does kafka support reducing topic partitions on the fly?

2021-05-19 Thread Pushkar Deole
Hi All,

I have a question around reducing topic partitions on kafka. Currently, all
topics are 6 partitions. I want to reduce it to 3 for some of the topics.
Is this supported by kafka without deleting the topic?


Re: kafka metric to monitor for consumer FETCH using disk caching and not going to disk

2021-05-16 Thread Pushkar Deole
thanks Alexandre... currently we are using kafka 2.5.0, so is there any
metric that can be used from 2.5.0?

On Sun, May 16, 2021 at 6:02 PM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hi Pushkar,
>
> If you are using Linux and Kafka 2.6.0+, the closest metric to what
> you are looking for is TotalDiskReadBytes [1], which measures data
> transfer at the block layer.
> Assuming your consumers are doing tail reads and there is no other
> activity which requires loading pages from the disk on your system
> (including log compaction from Kafka), you can determine if you are
> effectively hitting the disk or not.
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-551%3A+Expose+disk+read+and+write+metrics
>
> Thanks,
> Alexandre
>
> Le sam. 15 mai 2021 à 05:49, Pushkar Deole  a écrit
> :
> >
> > Hi All,
> >
> > is there any metric that I can use to check whether the memory allocated
> > for kafka is sufficient for the given load on the brokers and whether
> kafka
> > is optimally making use of page cache for consumer fetch reads which are
> > not going to disk for each read slowing down the overall consumer
> > processing ad thus increasing consumer lag?
> >
> > which metric can tell that i should assign more memory to brokers?
>


kafka metric to monitor for consumer FETCH using disk caching and not going to disk

2021-05-14 Thread Pushkar Deole
Hi All,

is there any metric that I can use to check whether the memory allocated
for kafka is sufficient for the given load on the brokers and whether kafka
is optimally making use of page cache for consumer fetch reads which are
not going to disk for each read slowing down the overall consumer
processing ad thus increasing consumer lag?

which metric can tell that i should assign more memory to brokers?


Re: Redis as state store

2021-03-21 Thread Pushkar Deole
Thanks Sophie... i was just thinking what would be a good options for us,
whether local state stores or redis state store, and it seems to me that
local state stores might prove more performant in the event of above
mentioned failures of streams application.

The main reason we are thinking of moving to redis state stores is because:
we want each application to have access to state saved by other stream
application instance. We tried to use a GlobalKTable backed by topic and
each instance would save to that topic which is then synchronized to
GlobalKTable in each application instance. However making GlobalKTable in
each application instance took around 100ms and before that time the next
event might need to get processed by application instance in which case it
did not have proper state.

I was also looking at some options today available with local state store
and came across that kafka also provides an RPC layer on top of state store
which allows a steam application to query state stored in local state store
of another stream application instance. Is that correct? If so then we can
try that option instead of redis state store. Let me know what you think

On Sun, Mar 21, 2021 at 6:38 AM Sophie Blee-Goldman
 wrote:

> Yes, if you have to restore from the changelog from scratch then this will
> definitely impact
> the application's performance. This is the current state of things for EOS
> applications that use
> some kind of local storage such as the in-memory or rocksdb state
> stores.The point of EOS is
> to be 100% correct, not to maximize performance -- it's a tradeoff and you
> need to decide what
> characteristics are most important for the specific application and use
> case.
>
> That said, obviously better performance is always a good thing when it's
> possible to do without
> sacrificing processing semantics. That's why I proposed to buffer updates;
> if we can avoid dirtying
> the store in the first place, then there's no need to wipe out all the
> state and rebuild from the changelog
> from scratch. So yes, this was intended as an alternative proposal which
> would improve the performance
> for any EOS application regardless of whether it uses local or remote
> storage.
>
> But as with all things, this has tradeoffs of its own: for one thing it's
> probably a significantly larger
> effort to implement, so if we want to correct the EOS + remote storage
> situation quickly then this
> approach would not be the best way to go. Also, buffering updates of course
> requires additional
> resources (ie storage and/or memory), so some users may actually prefer to
> take an occasional
> performance hit to keep their app lightweight.
>
> Anyways, these are just some thoughts on how to improve the current
> situation. Maybe there are
> even more options to address this problem which haven't been considered
> yet. Let us know if you
> have a better idea :)
>
> On Fri, Mar 19, 2021 at 11:50 PM Pushkar Deole 
> wrote:
>
> > Thanks Sophie... that answers my question, however still worried about
> some
> > other aspects:
> >
> > 1. If redis is to be restored from changelog topic: what would happen if
> i
> > have 3 stream applications and 1 instance went down ... will other 2
> > instances halt until entire existing state from redis is wiped out and
> > entire state is restored back from changelog topic? If so then it would
> > have a significant performance hit especially when this happens during
> > heavy traffic hours
> >
> > 2. Will #1 be solved by the 2nd alternative that you mentioned in the
> > comment i.e 'An alternative is to just start buffering updates in-memory
> > (or in rocksdb, this could be configurable) and then avoid dirtying the
> > remote storage in the first place as we would only flush the data out to
> it
> > during a commit'  It looks to me that this won't need rebuilding entire
> > state store because changelog is disabled, and this alternative would
> avoid
> > making the state store inconsistent in first place, thus saving wipe out
> > and rebuild ? If so then this also doesn't need to halt other stream
> > applications and would prove much more better approach from performance
> > point of view. Is that correct?
> >
> > On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman
> >  wrote:
> >
> > > Hey Pushkar, yes, the data will still be backed by a changelog topic
> > unless
> > > the
> > > user explicitly disables logging for that state store. The fault
> > tolerance
> > > mechanism
> > > of Kafka Streams is based on changelogging, therefore there are no
> > > correctness
> > > guarantees if you decide to disable it.
> > &g

Re: Redis as state store

2021-03-20 Thread Pushkar Deole
Thanks Sophie... that answers my question, however still worried about some
other aspects:

1. If redis is to be restored from changelog topic: what would happen if i
have 3 stream applications and 1 instance went down ... will other 2
instances halt until entire existing state from redis is wiped out and
entire state is restored back from changelog topic? If so then it would
have a significant performance hit especially when this happens during
heavy traffic hours

2. Will #1 be solved by the 2nd alternative that you mentioned in the
comment i.e 'An alternative is to just start buffering updates in-memory
(or in rocksdb, this could be configurable) and then avoid dirtying the
remote storage in the first place as we would only flush the data out to it
during a commit'  It looks to me that this won't need rebuilding entire
state store because changelog is disabled, and this alternative would avoid
making the state store inconsistent in first place, thus saving wipe out
and rebuild ? If so then this also doesn't need to halt other stream
applications and would prove much more better approach from performance
point of view. Is that correct?

On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman
 wrote:

> Hey Pushkar, yes, the data will still be backed by a changelog topic unless
> the
> user explicitly disables logging for that state store. The fault tolerance
> mechanism
> of Kafka Streams is based on changelogging, therefore there are no
> correctness
> guarantees if you decide to disable it.
>
> That said, I'm guessing many users do in fact disable the changelog when
> plugging
> in a remote store with it's own fault tolerance guarantees -- is that what
> you're getting
> at? We could definitely build in better support for that case, as either an
> additional
> optimization on top of KAFKA-12475
> <https://issues.apache.org/jira/browse/KAFKA-12475> or as an alternative
> implementation to fix the
> underlying EOS problem. Check out my latest comment on the ticket here
> <
> https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191
> >
>
> Does that address your question?
>
> On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole 
> wrote:
>
> > Hello Sophie,
> >
> > may be i am missing something here, however can you let me know how a
> redis
> > based state store will be wiped off the inconsistent state in case stream
> > application dies in the middle of processing e.g. stream application
> > consumed from source topic, processed source event and saved state to
> redis
> > and before producing event on destination topic, the stream application
> had
> > error.
> > If this occurs with a rocksDB or in-memory state store, it will be
> rebuilt
> > from changelog topic, however for redis state store, how it will wiped
> off
> > the state ? are we saying here that the data stored in redis will still
> be
> > backed by changelog topic and redis will be restored from backed topic in
> > case of stream application error?
> >
> > On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
> >  wrote:
> >
> > > This certainly does seem like a flaw in the Streams API, although of
> > course
> > > Streams is just
> > > in general not designed for use with remote anything (API calls,
> stores,
> > > etc)
> > >
> > > That said, I don't see any reason why we *couldn't* have better support
> > for
> > > remote state stores.
> > > Note that there's currently no deleteAll method on the store interface,
> > and
> > > not all databases
> > > necessarily support that. But we could add a default implementation
> which
> > > just calls delete(key)
> > > on all keys in the state store, and for the RocksDB-based state stores
> we
> > > still wipe out the state
> > > as usual (and recommend the same for any custom StateStore which is
> local
> > > rather than remote).
> > > Obviously falling back on individual delete(key) operations for all the
> > > data in the entire store will
> > > have worse performance, but that's better than silently breaking EOS
> when
> > > deleteAll is not available
> > > on a remote store.
> > >
> > > Would you be interested in doing a KIP for this? We should also file a
> > JIRA
> > > with the above explanation,
> > > so that other users of remote storage are aware of this limitation. And
> > > definitely document this somewhere
> > >
> > >
> > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
>  > >
> > > 

Re: Redis as state store

2021-03-19 Thread Pushkar Deole
Hello Sophie,

may be i am missing something here, however can you let me know how a redis
based state store will be wiped off the inconsistent state in case stream
application dies in the middle of processing e.g. stream application
consumed from source topic, processed source event and saved state to redis
and before producing event on destination topic, the stream application had
error.
If this occurs with a rocksDB or in-memory state store, it will be rebuilt
from changelog topic, however for redis state store, how it will wiped off
the state ? are we saying here that the data stored in redis will still be
backed by changelog topic and redis will be restored from backed topic in
case of stream application error?

On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
 wrote:

> This certainly does seem like a flaw in the Streams API, although of course
> Streams is just
> in general not designed for use with remote anything (API calls, stores,
> etc)
>
> That said, I don't see any reason why we *couldn't* have better support for
> remote state stores.
> Note that there's currently no deleteAll method on the store interface, and
> not all databases
> necessarily support that. But we could add a default implementation which
> just calls delete(key)
> on all keys in the state store, and for the RocksDB-based state stores we
> still wipe out the state
> as usual (and recommend the same for any custom StateStore which is local
> rather than remote).
> Obviously falling back on individual delete(key) operations for all the
> data in the entire store will
> have worse performance, but that's better than silently breaking EOS when
> deleteAll is not available
> on a remote store.
>
> Would you be interested in doing a KIP for this? We should also file a JIRA
> with the above explanation,
> so that other users of remote storage are aware of this limitation. And
> definitely document this somewhere
>
>
> On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna 
> wrote:
>
> > Hi Alex,
> >
> > I guess wiping out the state directory is easier code-wise, faster,
> > and/or at the time of development the developers did not design for
> > remote state stores. But I do actually not know the exact reason.
> >
> > Off the top of my head, I do not know how to solve this for remote state
> > stores. Using the uncaught exception handler is not good, because a
> > computing node could fail without giving the JVM the opportunity to
> > throw an exception.
> >
> > In your tests, try to increase the commit interval to a high value and
> > see if you get inconsistencies. You should get an inconsistency if the
> > state store maintains counts for keys and after the last commit before
> > the failure, the Streams app puts an event with a new key K with value 1
> > into the state store. After failover, Streams would put the same event
> > with key K again into the state store. If the state store deleted all of
> > its data, Streams would put again value 1, but if the state store did
> > not delete all data, Streams would put value 2 which is wrong because it
> > would count the same event twice.
> >
> > Best,
> > Bruno
> >
> >
> > On 15.03.21 15:20, Alex Craig wrote:
> > > Bruno,
> > > Thanks for the info!  that makes sense.  Of course now I have more
> > > questions.  :)  Do you know why this is being done outside of the state
> > > store API?  I assume there are reasons why a "deleteAll()" type of
> > function
> > > wouldn't work, thereby allowing a state store to purge itself?  And
> maybe
> > > more importantly, is there a way to achieve a similar behavior with a
> 3rd
> > > party store?  I'm not sure if hooking into the uncaught exception
> handler
> > > might be a good way to purge/drop a state store in the event of a fatal
> > > error?  I did setup a MongoDB state store recently as part of a POC and
> > was
> > > testing it with EOS enabled.  (forcing crashes to occur and checking
> that
> > > the result of my aggregation was still accurate)  I was unable to cause
> > > inconsistent data in the mongo store (which is good!), though of
> course I
> > > may just have been getting lucky.  Thanks again for your help,
> > >
> > > Alex
> > >
> > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole 
> > wrote:
> > >
> > >> Bruno,
> > >>
> > >> i tried to explain this in 'kafka user's language through above
> > mentioned
> > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > >>
> > >> On Mon, Mar 15, 2

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

2021-03-19 Thread Pushkar Deole
Matthias,

With reference to your response above, i came across the JIRA ticket
https://issues.apache.org/jira/browse/KAFKA-12475

For rocksDB or in-memory state stores, these are always backed by changelog
topic, so they can be rebuilt from scratch from the changelog topic.
However, how a remote state store can be made consistent in case of error
e.g. stream consumed event from source topic, processed and stored state to
redis, and before producing event to destination topic application dies. In
this case, offset won't be committed to source topic and destination topics
anyway doesn't have the processed event, however redis holds the new state.
How can redis be wiped off the state that was saved while processing above
event(s) ?

On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax  wrote:

> Well, that is exactly what I mean by "it depends on the state store
> implementation".
>
> For this case, you cannot get exactly-once.
>
> There are actually ideas to improve the implementation to support the
> case you describe, but there is no timeline for this change yet. Not
> even sure if there is already a Jira ticket...
>
>
> -Matthias
>
> On 1/6/21 2:32 AM, Pushkar Deole wrote:
> > The question is if we want to use state store of 3rd party, e.g. say
> Redis,
> > how can the store be consistent with rest of the system i.e. source and
> > destination topics...
> >
> > e.g. record is consumed from source, processed, state store updated with
> > some state, but before writing to destination there is failure
> > Now, in this case, with kafka state store, it will be wiped off the state
> > stored since the transaction failed.
> >
> > But with Redis, the state store is updated with the new state and there
> is
> > no way to revert back
> >
> > On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax 
> wrote:
> >
> >> It depends on the store implementation. Atm, EOS for state store is
> >> achieved by re-creating the state store in case of failure from the
> >> changelog topic.
> >>
> >> For RocksDB stores, we wipe out the local state directories and create a
> >> new empty RocksDB and for in-memory stores the content is "lost" anyway
> >> when state is migrated, and we reply the changelog into an empty store
> >> before processing resumes.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/5/21 6:27 AM, Alex Craig wrote:
> >>> I don't think he's asking about data-loss, but rather data consistency.
> >>> (in the event of an exception or application crash, will EOS ensure
> that
> >>> the state store data is consistent)  My understanding is that it DOES
> >> apply
> >>> to state stores as well, in the sense that a failure during processing
> >>> would mean that the commit wouldn't get flushed and therefore wouldn't
> >> get
> >>> double-counted once processing resumes and message is re-processed.
> >>> As far as using something other than RocksDB, I think as long as you
> are
> >>> implementing the state store API correctly you should be fine.  I did a
> >> POC
> >>> recently using Mongo state-stores with EOS enabled and it worked
> >> correctly,
> >>> even when I intentionally introduced failures and crashes.
> >>>
> >>> -alex
> >>>
> >>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang 
> >> wrote:
> >>>
> >>>> If there is a "change-log" topic to back up the state store, then it
> may
> >>>> not lose data.
> >>>>
> >>>> Also, if the third party store is not "kafka community certified" (or
> >> not
> >>>> well-maintained), it may have chances to lose data (in different
> ways).
> >>>>
> >>>>
> >>>>
> >>>> On 2021/01/05 04:56:12, Pushkar Deole  wrote:
> >>>>> In case we opt to choose some third party store instead of kafka's
> >> stores
> >>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>>>> exactly-once guarantee provided by kafka and the state stores can be
> in
> >>>> an
> >>>>> inconsistent state ?
> >>>>>
> >>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang 
> >>>> wrote:
> >>>>>
> >>>>>> The physical store behind "state store" is change-log kafka topic.
> In
> >>>>>> Kafka stream, if something fails in the middle, the "state store&qu

Re: Redis as state store

2021-03-15 Thread Pushkar Deole
Bruno,

i tried to explain this in 'kafka user's language through above mentioned
scenario, hope i put it properly -:) and correct me if i am wrong

On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole  wrote:

> This is what I understand could be the issue with external state store:
>
> kafka stream application consumes source topic, does processing, stores
> state to kafka state store (this is backed by topic) and before producing
> event on destination topic, the application fails with some issue. In
> this case, the transaction has failed, so kafka guarantees either all or
> none, means offset written to source topic, state written to state store
> topic, output produced on destination topic... all of these happen or none
> of these and in this failure scenario it is none of these.
>
> Assume you have redis state store, and you updated the state into redis
> and stream application failed. Now, you have source topic and destination
> topic consistent i.e. offset is not committed to source topic and output
> not produced on destination topic, but you redis state store is
> inconsistent with that since it is external state store and kafka can't
> guarantee rollback ot state written there
>
> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig  wrote:
>
>> " Another issue with 3rd party state stores could be violation of
>> exactly-once guarantee provided by kafka streams in the event of a failure
>> of streams application instance"
>>
>> I've heard this before but would love to know more about how a custom
>> state
>> store would be at any greater risk than RocksDB as far as exactly-once
>> guarantees are concerned.  They all implement the same interface, so as
>> long as you're correctly implementing get(), put(), delete(), flush(),
>> etc,
>> you should be fine right?  In other words, I don't think there is any
>> special "exactly once magic" that is baked into the RocksDB store code.  I
>> could be wrong though so I'd love to hear people's thoughts, thanks,
>>
>> Alex C
>>
>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
>> wrote:
>>
>> > Thanks for the responses. In the worst case, I might have to keep both
>> > rocksdb for local store and keep an external store like Redis.
>> >
>> > -mohan
>> >
>> >
>> > On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
>> >
>> > Another issue with 3rd party state stores could be violation of
>> > exactly-once guarantee provided by kafka streams in the event of a
>> > failure
>> > of streams application instance.
>> > Kafka provides exactly once guarantee for consumer -> process ->
>> > produce
>> > through transactions and with the use of state store like redis,
>> this
>> > guarantee is weaker
>> >
>> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
>> > wrote:
>> >
>> > > Hello Mohan,
>> > >
>> > > I think what you had in mind works with Redis, since it is a
>> remote
>> > state
>> > > store engine, it does not have the co-partitioning requirements as
>> > local
>> > > state stores.
>> > >
>> > > One thing you'd need to tune KS though is that with remote stores,
>> > the
>> > > processing latency may be larger, and since Kafka Streams process
>> all
>> > > records of a single partition in order, synchronously, you may
>> need
>> > to tune
>> > > the poll interval configs etc to make sure KS would stay in the
>> > consumer
>> > > group and not trigger unnecessary rebalances.
>> > >
>> > > Guozhang
>> > >
>> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
>> > mpart...@hpe.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have a use case where messages come in with some key gets
>> > assigned some
>> > > > partition and the state gets created. Later, key changes (but
>> still
>> > > > contains the old key in the message) and gets sent to a
>> different
>> > > > partition. I want to be able to grab the old state using the old
>> > key
>> > > before
>> > > > creating the new state on this instance. Redis as a  state store
>> > makes it
>> > > > easy to implement this where I can simply do a lookup before
>> > creating the
>> > > > state. I see an implementation here :
>> > > >
>> > >
>> >
>> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
>> > > >
>> > > > Has anyone tried this ? Any caveats.
>> > > >
>> > > > Thanks
>> > > > Mohan
>> > > >
>> > > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>>
>


Re: Redis as state store

2021-03-15 Thread Pushkar Deole
This is what I understand could be the issue with external state store:

kafka stream application consumes source topic, does processing, stores
state to kafka state store (this is backed by topic) and before producing
event on destination topic, the application fails with some issue. In
this case, the transaction has failed, so kafka guarantees either all or
none, means offset written to source topic, state written to state store
topic, output produced on destination topic... all of these happen or none
of these and in this failure scenario it is none of these.

Assume you have redis state store, and you updated the state into redis and
stream application failed. Now, you have source topic and destination topic
consistent i.e. offset is not committed to source topic and output not
produced on destination topic, but you redis state store is inconsistent
with that since it is external state store and kafka can't guarantee
rollback ot state written there

On Mon, Mar 15, 2021 at 6:30 PM Alex Craig  wrote:

> " Another issue with 3rd party state stores could be violation of
> exactly-once guarantee provided by kafka streams in the event of a failure
> of streams application instance"
>
> I've heard this before but would love to know more about how a custom state
> store would be at any greater risk than RocksDB as far as exactly-once
> guarantees are concerned.  They all implement the same interface, so as
> long as you're correctly implementing get(), put(), delete(), flush(), etc,
> you should be fine right?  In other words, I don't think there is any
> special "exactly once magic" that is baked into the RocksDB store code.  I
> could be wrong though so I'd love to hear people's thoughts, thanks,
>
> Alex C
>
> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
> wrote:
>
> > Thanks for the responses. In the worst case, I might have to keep both
> > rocksdb for local store and keep an external store like Redis.
> >
> > -mohan
> >
> >
> > On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
> >
> > Another issue with 3rd party state stores could be violation of
> > exactly-once guarantee provided by kafka streams in the event of a
> > failure
> > of streams application instance.
> > Kafka provides exactly once guarantee for consumer -> process ->
> > produce
> > through transactions and with the use of state store like redis, this
> > guarantee is weaker
> >
> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
> > wrote:
> >
> > > Hello Mohan,
> > >
> > > I think what you had in mind works with Redis, since it is a remote
> > state
> > > store engine, it does not have the co-partitioning requirements as
> > local
> > > state stores.
> > >
> > > One thing you'd need to tune KS though is that with remote stores,
> > the
> > > processing latency may be larger, and since Kafka Streams process
> all
> > > records of a single partition in order, synchronously, you may need
> > to tune
> > > the poll interval configs etc to make sure KS would stay in the
> > consumer
> > > group and not trigger unnecessary rebalances.
> > >
> > > Guozhang
> > >
> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > mpart...@hpe.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a use case where messages come in with some key gets
> > assigned some
> > > > partition and the state gets created. Later, key changes (but
> still
> > > > contains the old key in the message) and gets sent to a different
> > > > partition. I want to be able to grab the old state using the old
> > key
> > > before
> > > > creating the new state on this instance. Redis as a  state store
> > makes it
> > > > easy to implement this where I can simply do a lookup before
> > creating the
> > > > state. I see an implementation here :
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > >
> > > > Has anyone tried this ? Any caveats.
> > > >
> > > > Thanks
> > > > Mohan
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
>


Re: Redis as state store

2021-03-13 Thread Pushkar Deole
Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a failure
of streams application instance.
Kafka provides exactly once guarantee for consumer -> process -> produce
through transactions and with the use of state store like redis, this
guarantee is weaker

On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang  wrote:

> Hello Mohan,
>
> I think what you had in mind works with Redis, since it is a remote state
> store engine, it does not have the co-partitioning requirements as local
> state stores.
>
> One thing you'd need to tune KS though is that with remote stores, the
> processing latency may be larger, and since Kafka Streams process all
> records of a single partition in order, synchronously, you may need to tune
> the poll interval configs etc to make sure KS would stay in the consumer
> group and not trigger unnecessary rebalances.
>
> Guozhang
>
> On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan 
> wrote:
>
> > Hi,
> >
> > I have a use case where messages come in with some key gets assigned some
> > partition and the state gets created. Later, key changes (but still
> > contains the old key in the message) and gets sent to a different
> > partition. I want to be able to grab the old state using the old key
> before
> > creating the new state on this instance. Redis as a  state store makes it
> > easy to implement this where I can simply do a lookup before creating the
> > state. I see an implementation here :
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> >
> > Has anyone tried this ? Any caveats.
> >
> > Thanks
> > Mohan
> >
> >
>
> --
> -- Guozhang
>


Re: options for kafka cluster backup?

2021-03-07 Thread Pushkar Deole
Thanks you all!

Blake, for your comment:

It'll require having a HA cluster running in another region, of course.
One other caveat is that it doesn't preserve the offsets of the records

-> I believe I can't afford to keep another cluster running due to cost
reasons.Can you elaborate on the offset part, if offset is not preserved
then how the backup cluster know where to start processing for each topic?

For example, you could use a Kafka Connect s3 sink. You'd have to write
some disaster-recovery code to restore lost data from s3 into Kafka.

-> again here the same question, does s3 also store offset for each topic
as it is modified in kafka? If not then when the back is restored back into
kafka cluster, how it will know where to process each topic from?

On Sat, Mar 6, 2021 at 4:44 PM Himanshu Shukla 
wrote:

> Hi Pushkar,
>
> you could also look at the available Kafka-connect plugins. It provides
> many connectors which could be leveraged to move the data in/out from
> Kafka.
>
> On Sat, Mar 6, 2021 at 10:18 AM Blake Miller 
> wrote:
>
> > MirrorMaker is one reasonable way to do this, certainly it can replicate
> to
> > another region, with most of the latency being the unavoidable kind, if
> you
> > give it enough resources.
> >
> > It'll require having a HA cluster running in another region, of course.
> One
> > other caveat is that it doesn't preserve the offsets of the records.
> That's
> > probably okay for your use-case, but you should be aware of it.
> >
> > Since what you want is a backup, there are many ways to do that which
> might
> > be cheaper than another Kafka cluster.
> >
> > For example, you could use a Kafka Connect s3 sink. You'd have to write
> > some disaster-recovery code to restore lost data from s3 into Kafka.
> >
> > https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
> >
> > There are many other sinks available, but s3 might be a reasonable choice
> > for backup. It's inexpensive and reliable.
> >
> > On Fri, Mar 5, 2021, 2:48 AM Pushkar Deole  wrote:
> >
> > > Yes.. so the requirement for me is to have data backed up or replicated
> > in
> > > a different 'region' to cater for disaster scenarios and recover from
> > them
> > >
> > > On Fri, Mar 5, 2021 at 3:01 PM Ran Lupovich 
> > wrote:
> > >
> > > > I guess that in case of avoiding data lose you would need to use 3
> > > replica
> > > > in different rack/sites awareness to avoid data lose, Confluent's
> > > > Replicator or MirrorMaker are for copying data from one cluster to
> > > another
> > > > usually in different dc / regions, If I am not mistaken
> > > >
> > > > בתאריך יום ו׳, 5 במרץ 2021, 11:21, מאת Pushkar Deole ‏<
> > > > pdeole2...@gmail.com
> > > > >:
> > > >
> > > > > Thanks Luke... is the mirror maker asynchronous? What will be
> typical
> > > lag
> > > > > between the replicated cluster and running cluster and in case of
> > > > disaster,
> > > > > what are the chances of data loss?
> > > > >
> > > > > On Fri, Mar 5, 2021 at 11:37 AM Luke Chen 
> wrote:
> > > > >
> > > > > > Hi Pushkar,
> > > > > > MirrorMaker is what you're looking for.
> > > > > > ref:
> > > > https://kafka.apache.org/documentation/#georeplication-mirrormaker
> > > > > >
> > > > > > Thanks.
> > > > > > Luke
> > > > > >
> > > > > > On Fri, Mar 5, 2021 at 1:50 PM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I was looking for some options to backup a running kafka
> cluster,
> > > for
> > > > > > > disaster recovery requirements. Can someone provide what are
> the
> > > > > > available
> > > > > > > options to backup and restore a running cluster in case the
> > entire
> > > > > > cluster
> > > > > > > goes down?
> > > > > > >
> > > > > > > Thanks..
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Regards,
> Himanshu Shukla
>


Re: options for kafka cluster backup?

2021-03-05 Thread Pushkar Deole
Yes.. so the requirement for me is to have data backed up or replicated in
a different 'region' to cater for disaster scenarios and recover from them

On Fri, Mar 5, 2021 at 3:01 PM Ran Lupovich  wrote:

> I guess that in case of avoiding data lose you would need to use 3 replica
> in different rack/sites awareness to avoid data lose, Confluent's
> Replicator or MirrorMaker are for copying data from one cluster to another
> usually in different dc / regions, If I am not mistaken
>
> בתאריך יום ו׳, 5 במרץ 2021, 11:21, מאת Pushkar Deole ‏<
> pdeole2...@gmail.com
> >:
>
> > Thanks Luke... is the mirror maker asynchronous? What will be typical lag
> > between the replicated cluster and running cluster and in case of
> disaster,
> > what are the chances of data loss?
> >
> > On Fri, Mar 5, 2021 at 11:37 AM Luke Chen  wrote:
> >
> > > Hi Pushkar,
> > > MirrorMaker is what you're looking for.
> > > ref:
> https://kafka.apache.org/documentation/#georeplication-mirrormaker
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Fri, Mar 5, 2021 at 1:50 PM Pushkar Deole 
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I was looking for some options to backup a running kafka cluster, for
> > > > disaster recovery requirements. Can someone provide what are the
> > > available
> > > > options to backup and restore a running cluster in case the entire
> > > cluster
> > > > goes down?
> > > >
> > > > Thanks..
> > > >
> > >
> >
>


Re: options for kafka cluster backup?

2021-03-05 Thread Pushkar Deole
Thanks Luke... is the mirror maker asynchronous? What will be typical lag
between the replicated cluster and running cluster and in case of disaster,
what are the chances of data loss?

On Fri, Mar 5, 2021 at 11:37 AM Luke Chen  wrote:

> Hi Pushkar,
> MirrorMaker is what you're looking for.
> ref: https://kafka.apache.org/documentation/#georeplication-mirrormaker
>
> Thanks.
> Luke
>
> On Fri, Mar 5, 2021 at 1:50 PM Pushkar Deole  wrote:
>
> > Hi All,
> >
> > I was looking for some options to backup a running kafka cluster, for
> > disaster recovery requirements. Can someone provide what are the
> available
> > options to backup and restore a running cluster in case the entire
> cluster
> > goes down?
> >
> > Thanks..
> >
>


options for kafka cluster backup?

2021-03-04 Thread Pushkar Deole
Hi All,

I was looking for some options to backup a running kafka cluster, for
disaster recovery requirements. Can someone provide what are the available
options to backup and restore a running cluster in case the entire cluster
goes down?

Thanks..


Re: does Kafka exactly-once guarantee also apply to kafka state stores?

2021-01-19 Thread Pushkar Deole
Is there also a way to avoid duplicates if the application consumer from
kafka topic and writes the events to database?
e.g. in case the application restarts while processing a batch read from
topic and few events already written to database, when application
restarts, those events are again consumed by another instance and written
back to database.

Could this behavior be avoided somehow without putting constraints on
database table?

On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax  wrote:

> Well, that is exactly what I mean by "it depends on the state store
> implementation".
>
> For this case, you cannot get exactly-once.
>
> There are actually ideas to improve the implementation to support the
> case you describe, but there is no timeline for this change yet. Not
> even sure if there is already a Jira ticket...
>
>
> -Matthias
>
> On 1/6/21 2:32 AM, Pushkar Deole wrote:
> > The question is if we want to use state store of 3rd party, e.g. say
> Redis,
> > how can the store be consistent with rest of the system i.e. source and
> > destination topics...
> >
> > e.g. record is consumed from source, processed, state store updated with
> > some state, but before writing to destination there is failure
> > Now, in this case, with kafka state store, it will be wiped off the state
> > stored since the transaction failed.
> >
> > But with Redis, the state store is updated with the new state and there
> is
> > no way to revert back
> >
> > On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax 
> wrote:
> >
> >> It depends on the store implementation. Atm, EOS for state store is
> >> achieved by re-creating the state store in case of failure from the
> >> changelog topic.
> >>
> >> For RocksDB stores, we wipe out the local state directories and create a
> >> new empty RocksDB and for in-memory stores the content is "lost" anyway
> >> when state is migrated, and we reply the changelog into an empty store
> >> before processing resumes.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/5/21 6:27 AM, Alex Craig wrote:
> >>> I don't think he's asking about data-loss, but rather data consistency.
> >>> (in the event of an exception or application crash, will EOS ensure
> that
> >>> the state store data is consistent)  My understanding is that it DOES
> >> apply
> >>> to state stores as well, in the sense that a failure during processing
> >>> would mean that the commit wouldn't get flushed and therefore wouldn't
> >> get
> >>> double-counted once processing resumes and message is re-processed.
> >>> As far as using something other than RocksDB, I think as long as you
> are
> >>> implementing the state store API correctly you should be fine.  I did a
> >> POC
> >>> recently using Mongo state-stores with EOS enabled and it worked
> >> correctly,
> >>> even when I intentionally introduced failures and crashes.
> >>>
> >>> -alex
> >>>
> >>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang 
> >> wrote:
> >>>
> >>>> If there is a "change-log" topic to back up the state store, then it
> may
> >>>> not lose data.
> >>>>
> >>>> Also, if the third party store is not "kafka community certified" (or
> >> not
> >>>> well-maintained), it may have chances to lose data (in different
> ways).
> >>>>
> >>>>
> >>>>
> >>>> On 2021/01/05 04:56:12, Pushkar Deole  wrote:
> >>>>> In case we opt to choose some third party store instead of kafka's
> >> stores
> >>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>>>> exactly-once guarantee provided by kafka and the state stores can be
> in
> >>>> an
> >>>>> inconsistent state ?
> >>>>>
> >>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang 
> >>>> wrote:
> >>>>>
> >>>>>> The physical store behind "state store" is change-log kafka topic.
> In
> >>>>>> Kafka stream, if something fails in the middle, the "state store" is
> >>>>>> restored back to the state before the event happens at the first
> step
> >> /
> >>>>>> beginning of the stream.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>&

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

2021-01-06 Thread Pushkar Deole
The question is if we want to use state store of 3rd party, e.g. say Redis,
how can the store be consistent with rest of the system i.e. source and
destination topics...

e.g. record is consumed from source, processed, state store updated with
some state, but before writing to destination there is failure
Now, in this case, with kafka state store, it will be wiped off the state
stored since the transaction failed.

But with Redis, the state store is updated with the new state and there is
no way to revert back

On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax  wrote:

> It depends on the store implementation. Atm, EOS for state store is
> achieved by re-creating the state store in case of failure from the
> changelog topic.
>
> For RocksDB stores, we wipe out the local state directories and create a
> new empty RocksDB and for in-memory stores the content is "lost" anyway
> when state is migrated, and we reply the changelog into an empty store
> before processing resumes.
>
>
> -Matthias
>
> On 1/5/21 6:27 AM, Alex Craig wrote:
> > I don't think he's asking about data-loss, but rather data consistency.
> > (in the event of an exception or application crash, will EOS ensure that
> > the state store data is consistent)  My understanding is that it DOES
> apply
> > to state stores as well, in the sense that a failure during processing
> > would mean that the commit wouldn't get flushed and therefore wouldn't
> get
> > double-counted once processing resumes and message is re-processed.
> > As far as using something other than RocksDB, I think as long as you are
> > implementing the state store API correctly you should be fine.  I did a
> POC
> > recently using Mongo state-stores with EOS enabled and it worked
> correctly,
> > even when I intentionally introduced failures and crashes.
> >
> > -alex
> >
> > On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang 
> wrote:
> >
> >> If there is a "change-log" topic to back up the state store, then it may
> >> not lose data.
> >>
> >> Also, if the third party store is not "kafka community certified" (or
> not
> >> well-maintained), it may have chances to lose data (in different ways).
> >>
> >>
> >>
> >> On 2021/01/05 04:56:12, Pushkar Deole  wrote:
> >>> In case we opt to choose some third party store instead of kafka's
> stores
> >>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>> exactly-once guarantee provided by kafka and the state stores can be in
> >> an
> >>> inconsistent state ?
> >>>
> >>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang 
> >> wrote:
> >>>
> >>>> The physical store behind "state store" is change-log kafka topic. In
> >>>> Kafka stream, if something fails in the middle, the "state store" is
> >>>> restored back to the state before the event happens at the first step
> /
> >>>> beginning of the stream.
> >>>>
> >>>>
> >>>>
> >>>> On 2020/12/31 08:48:16, Pushkar Deole  wrote:
> >>>>> Hi All,
> >>>>>
> >>>>> We use Kafka streams and may need to use exactly-once configuration
> >> for
> >>>>> some of the use cases. Currently, the application uses either local
> >> or
> >>>>> global state store to store state.
> >>>>>  So, the application will consume events from source kafka topic,
> >> process
> >>>>> the events, for state stores it will use either local or global state
> >>>> store
> >>>>> of kafka, then produce events onto the destination topic.
> >>>>>
> >>>>> Question i have is: in the case of exactly-once setting, kafka
> >> streams
> >>>>> guarantees that all actions happen or nothing happens. So, in this
> >> case,
> >>>>> any state stored on the local or global state store will also be
> >> counted
> >>>>> under 'all or nothing' guarantee e.g. if event is consumed and state
> >>>> store
> >>>>> is updated, however some issue occurs before event is produced on
> >>>>> destination topic then will state store be restored back to the state
> >>>>> before it was updated for this event?
> >>>>>
> >>>>
> >>>
> >>
> >
>


Re: does Kafka exactly-once guarantee also apply to kafka state stores?

2021-01-04 Thread Pushkar Deole
In case we opt to choose some third party store instead of kafka's stores
for storing state (e.g. Redis cache or Ignite), then will we lose the
exactly-once guarantee provided by kafka and the state stores can be in an
inconsistent state ?

On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang  wrote:

> The physical store behind "state store" is change-log kafka topic. In
> Kafka stream, if something fails in the middle, the "state store" is
> restored back to the state before the event happens at the first step /
> beginning of the stream.
>
>
>
> On 2020/12/31 08:48:16, Pushkar Deole  wrote:
> > Hi All,
> >
> > We use Kafka streams and may need to use exactly-once configuration for
> > some of the use cases. Currently, the application uses either local or
> > global state store to store state.
> >  So, the application will consume events from source kafka topic, process
> > the events, for state stores it will use either local or global state
> store
> > of kafka, then produce events onto the destination topic.
> >
> > Question i have is: in the case of exactly-once setting, kafka streams
> > guarantees that all actions happen or nothing happens. So, in this case,
> > any state stored on the local or global state store will also be counted
> > under 'all or nothing' guarantee e.g. if event is consumed and state
> store
> > is updated, however some issue occurs before event is produced on
> > destination topic then will state store be restored back to the state
> > before it was updated for this event?
> >
>


does Kafka exactly-once guarantee also apply to kafka state stores?

2020-12-31 Thread Pushkar Deole
Hi All,

We use Kafka streams and may need to use exactly-once configuration for
some of the use cases. Currently, the application uses either local or
global state store to store state.
 So, the application will consume events from source kafka topic, process
the events, for state stores it will use either local or global state store
of kafka, then produce events onto the destination topic.

Question i have is: in the case of exactly-once setting, kafka streams
guarantees that all actions happen or nothing happens. So, in this case,
any state stored on the local or global state store will also be counted
under 'all or nothing' guarantee e.g. if event is consumed and state store
is updated, however some issue occurs before event is produced on
destination topic then will state store be restored back to the state
before it was updated for this event?


Re: KIP to Gracefully handle timeout exception on kafka streams

2020-12-11 Thread Pushkar Deole
Matthias,

By the way, one more of our service recently encountered this exception:
can you suggest if this can also be avoided by tuning any specific
configuration ?

{"@timestamp":"2020-11-24T13:33:38.617+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3
stream - org.apache.kafka.common.errors.TimeoutException: Timeout of
6ms expired before the position for partition engagement-18 could be
determined","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3","level":"ERROR","level_value":4,"stack_trace":"java.lang.IllegalStateException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
before the position for partition engagement-18 could be determined\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:510)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)\n\tat
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the position for partition engagement-18 could be
determined\n"}



On Sun, Nov 22, 2020 at 12:35 AM Matthias J. Sax  wrote:

> KIP-572 will only ship in 2.8.0.
>
> For the exception you hit, it's `max.block.ms` -- you might also look
> into `default.api.timeout.ms`.
>
> In general, the relevant configs are documented in the JavaDocs of the
> corresponding client method.
>
>
> -Matthias
>
> On 11/20/20 9:11 PM, Pushkar Deole wrote:
> > Thanks Matthias... We are already on kafka 2.5.0, and
> > https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these
> type
> > of issues are fixed in 2.5.0
> >
> > Is KIP-572 planned for 2.7.0 ?
> >
> > Also, for timeout and retries, can you provide which parameters should we
> > configure to higher values for now?
> >
> >
> > On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax 
> wrote:
> >
> >> Yes, if brokers are upgraded via rolling bounce, and the embedded
> >> clients are configured with large enough timeouts and retries, they
> >> should just fail over to running brokers if a single broker is bounced.
> >>
> >> If you get a timeout exception, than KafkaStreams dies atm -- we have
> >> KIP-572 in-flight that will improve the situation by adding one more
> >> retry layer within KafkaStreams itself. For now, you would need to
> >> increase the corresponding client timeouts to avoid that the client
> >> throws a timeout exception.
> >>
> >> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
> >> you could have hit, too.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/18/20 7:05 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> We recently ran into an issue where kafka brokers upgraded (i guess it
> >> was
> >>> rolling update) for Aiven business plan 4 to plan 8. This involves
> change
> >>> to cpu, memory and storage for each broker.
> >>>
> >>> Since this should be rolling upgrade, we expected services to survive,
> >>> however in one service we saw streams ran into Error with below
> >> exception:
> >>> Few questions:
> >>> 1. If a broker goes down, the kafka streams client should handle
> >> internally
> >>> and connect to available broker since we have topic with replicas equal
> >> to
> >>> no. of brokers. Is this correct?
> >>> 2. the below error says timeout expired while awaiting InitProducerId..
> >>> what does this signify and why would this timeout occur when there will
> >> be
> >>> other brokers up and running?
> >>>
> >>>
> >>>
> >>
> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@versio

Re: multi-threaded consumer configuration like stream threads?

2020-11-23 Thread Pushkar Deole
I think we can handle the failures selectively e.g. if there are issues
with downstream database server then all the events will fail to process so
it will be worth to keep retrying. Else if there is issue only while
processing a particular event, then we can keep retry timeout and after
that timeout it will take another event for processing .

On Tue, Nov 24, 2020 at 5:35 AM Haruki Okada  wrote:

> I see.
>
> Then I think the appropriate approach depends on your delivery latency
> requirements.
> Just retrying until success is simpler but it could block subsequent
> messages to get processed. (also depends on thread pool size though)
>
> Then another concern when using dead letter topic would be retrying
> backoff.
> If you don't use backoff, the production for dead letter topic could burst
> when downstream db experiences transient problems but on the other hand
> injecting backoff-delay would require consideration about how to not block
> subsequent messages.
>
> (FYI, Decaton provides retry-queueing with backoff out-of-the box. :)
> https://github.com/line/decaton/blob/master/docs/retry-queueing.adoc)
>
> 2020年11月24日(火) 2:38 Pushkar Deole :
>
> > Thanks Haruki... right now the max of such types of events that we would
> > have is 100 since we would be supporting those many customers (accounts)
> > for now, for which we are considering a simple approach of a single
> > consumer and a thread pool with around 10 threads. So the question was
> > regarding how to manage failed events, should those be retried until
> > successful or sent to a dead letter queue/topic from where they will be
> > processed again until successful.
> >
> >
> > On Mon, Nov 23, 2020 at 10:16 PM Haruki Okada 
> wrote:
> >
> > > Hi Pushkar.
> > >
> > > Just for your information, https://github.com/line/decaton is a Kafka
> > > consumer framework that supports parallel processing per single
> > partition.
> > >
> > > It manages committable (i.e. the offset that all preceding offsets have
> > > been processed) offset internally so that preserves at-least-once
> > semantics
> > > even when processing in parallel.
> > >
> > >
> > > 2020年11月24日(火) 1:16 Pushkar Deole :
> > >
> > > > Thanks Liam!
> > > > We don't have a requirement to maintain order of processing for
> events
> > > even
> > > > within a partition. Essentially, these are events for various
> accounts
> > > > (customers) that we want to support and do necessary database
> > > provisioning
> > > > for those in our database. So they can be processed in parallel.
> > > >
> > > > I think the 2nd option would suit our requirement to have a single
> > > consumer
> > > > and a bound thread pool for processors. However, the issue we may
> face
> > is
> > > > to commit the offsets only after processing an event since we don't
> > want
> > > > the consumer to auto commit offsets before the provisioning done for
> > the
> > > > customer. How can that be achieved with model #2  ?
> > > >
> > > > On Tue, Oct 27, 2020 at 2:50 PM Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> wrote:
> > > >
> > > > > Hi Pushkar,
> > > > >
> > > > > No. You'd need to combine a consumer with a thread pool or similar
> as
> > > you
> > > > > prefer. As the docs say (from
> > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > )
> > > > >
> > > > > We have intentionally avoided implementing a particular threading
> > model
> > > > for
> > > > > > processing. This leaves several options for implementing
> > > multi-threaded
> > > > > > processing of records.
> > > > > > 1. One Consumer Per Thread
> > > > > > A simple option is to give each thread its own consumer instance.
> > > Here
> > > > > are
> > > > > > the pros and cons of this approach:
> > > > > >
> > > > > >- *PRO*: It is the easiest to implement
> > > > > >
> > > > > >
> > > > > >- *PRO*: It is often the fastest as no inter-thread
> > co-ordination
> > > is
> > > > > >needed
> > > > > >
> > > >

Re: multi-threaded consumer configuration like stream threads?

2020-11-23 Thread Pushkar Deole
Thanks Haruki... right now the max of such types of events that we would
have is 100 since we would be supporting those many customers (accounts)
for now, for which we are considering a simple approach of a single
consumer and a thread pool with around 10 threads. So the question was
regarding how to manage failed events, should those be retried until
successful or sent to a dead letter queue/topic from where they will be
processed again until successful.


On Mon, Nov 23, 2020 at 10:16 PM Haruki Okada  wrote:

> Hi Pushkar.
>
> Just for your information, https://github.com/line/decaton is a Kafka
> consumer framework that supports parallel processing per single partition.
>
> It manages committable (i.e. the offset that all preceding offsets have
> been processed) offset internally so that preserves at-least-once semantics
> even when processing in parallel.
>
>
> 2020年11月24日(火) 1:16 Pushkar Deole :
>
> > Thanks Liam!
> > We don't have a requirement to maintain order of processing for events
> even
> > within a partition. Essentially, these are events for various accounts
> > (customers) that we want to support and do necessary database
> provisioning
> > for those in our database. So they can be processed in parallel.
> >
> > I think the 2nd option would suit our requirement to have a single
> consumer
> > and a bound thread pool for processors. However, the issue we may face is
> > to commit the offsets only after processing an event since we don't want
> > the consumer to auto commit offsets before the provisioning done for the
> > customer. How can that be achieved with model #2  ?
> >
> > On Tue, Oct 27, 2020 at 2:50 PM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hi Pushkar,
> > >
> > > No. You'd need to combine a consumer with a thread pool or similar as
> you
> > > prefer. As the docs say (from
> > >
> > >
> >
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > )
> > >
> > > We have intentionally avoided implementing a particular threading model
> > for
> > > > processing. This leaves several options for implementing
> multi-threaded
> > > > processing of records.
> > > > 1. One Consumer Per Thread
> > > > A simple option is to give each thread its own consumer instance.
> Here
> > > are
> > > > the pros and cons of this approach:
> > > >
> > > >- *PRO*: It is the easiest to implement
> > > >
> > > >
> > > >- *PRO*: It is often the fastest as no inter-thread co-ordination
> is
> > > >needed
> > > >
> > > >
> > > >- *PRO*: It makes in-order processing on a per-partition basis
> very
> > > >easy to implement (each thread just processes messages in the
> order
> > it
> > > >receives them).
> > > >
> > > >
> > > >- *CON*: More consumers means more TCP connections to the cluster
> > (one
> > > >per thread). In general Kafka handles connections very efficiently
> > so
> > > this
> > > >is generally a small cost.
> > > >
> > > >
> > > >- *CON*: Multiple consumers means more requests being sent to the
> > > >server and slightly less batching of data which can cause some
> drop
> > > in I/O
> > > >throughput.
> > > >
> > > >
> > > >- *CON*: The number of total threads across all processes will be
> > > >limited by the total number of partitions.
> > > >
> > > > 2. Decouple Consumption and Processing
> > > > Another alternative is to have one or more consumer threads that do
> all
> > > > data consumption and hands off ConsumerRecords
> > > > <
> > >
> >
> https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html
> > >
> > > instances
> > > > to a blocking queue consumed by a pool of processor threads that
> > actually
> > > > handle the record processing. This option likewise has pros and cons:
> > > >
> > > >- *PRO*: This option allows independently scaling the number of
> > > >consumers and processors. This makes it possible to have a single
> > > consumer
> > > >that feeds many processor threads, avoiding any limitation on
> > > partitions.
> > > >
>

Re: multi-threaded consumer configuration like stream threads?

2020-11-23 Thread Pushkar Deole
Thanks Liam!
We don't have a requirement to maintain order of processing for events even
within a partition. Essentially, these are events for various accounts
(customers) that we want to support and do necessary database provisioning
for those in our database. So they can be processed in parallel.

I think the 2nd option would suit our requirement to have a single consumer
and a bound thread pool for processors. However, the issue we may face is
to commit the offsets only after processing an event since we don't want
the consumer to auto commit offsets before the provisioning done for the
customer. How can that be achieved with model #2  ?

On Tue, Oct 27, 2020 at 2:50 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi Pushkar,
>
> No. You'd need to combine a consumer with a thread pool or similar as you
> prefer. As the docs say (from
>
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> )
>
> We have intentionally avoided implementing a particular threading model for
> > processing. This leaves several options for implementing multi-threaded
> > processing of records.
> > 1. One Consumer Per Thread
> > A simple option is to give each thread its own consumer instance. Here
> are
> > the pros and cons of this approach:
> >
> >- *PRO*: It is the easiest to implement
> >
> >
> >- *PRO*: It is often the fastest as no inter-thread co-ordination is
> >needed
> >
> >
> >- *PRO*: It makes in-order processing on a per-partition basis very
> >easy to implement (each thread just processes messages in the order it
> >receives them).
> >
> >
> >- *CON*: More consumers means more TCP connections to the cluster (one
> >per thread). In general Kafka handles connections very efficiently so
> this
> >is generally a small cost.
> >
> >
> >- *CON*: Multiple consumers means more requests being sent to the
> >server and slightly less batching of data which can cause some drop
> in I/O
> >throughput.
> >
> >
> >- *CON*: The number of total threads across all processes will be
> >limited by the total number of partitions.
> >
> > 2. Decouple Consumption and Processing
> > Another alternative is to have one or more consumer threads that do all
> > data consumption and hands off ConsumerRecords
> > <
> https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html>
> instances
> > to a blocking queue consumed by a pool of processor threads that actually
> > handle the record processing. This option likewise has pros and cons:
> >
> >- *PRO*: This option allows independently scaling the number of
> >consumers and processors. This makes it possible to have a single
> consumer
> >that feeds many processor threads, avoiding any limitation on
> partitions.
> >
> >
> >- *CON*: Guaranteeing order across the processors requires particular
> >care as the threads will execute independently an earlier chunk of
> data may
> >actually be processed after a later chunk of data just due to the
> luck of
> >thread execution timing. For processing that has no ordering
> requirements
> >this is not a problem.
> >
> >
> >- *CON*: Manually committing the position becomes harder as it
> >requires that all threads co-ordinate to ensure that processing is
> complete
> >for that partition.
> >
> > There are many possible variations on this approach. For example each
> > processor thread can have its own queue, and the consumer threads can
> hash
> > into these queues using the TopicPartition to ensure in-order consumption
> > and simplify commit.
>
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Tue, Oct 27, 2020 at 8:04 PM Pushkar Deole 
> wrote:
>
> > Hi,
> >
> > Is there any configuration in kafka consumer to specify multiple threads
> > the way it is there in kafka streams?
> > Essentially, can we have a consumer with multiple threads where the
> threads
> > would divide partitions of topic among them?
> >
>


Re: KIP to Gracefully handle timeout exception on kafka streams

2020-11-20 Thread Pushkar Deole
Thanks Matthias... We are already on kafka 2.5.0, and
https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these type
of issues are fixed in 2.5.0

Is KIP-572 planned for 2.7.0 ?

Also, for timeout and retries, can you provide which parameters should we
configure to higher values for now?


On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax  wrote:

> Yes, if brokers are upgraded via rolling bounce, and the embedded
> clients are configured with large enough timeouts and retries, they
> should just fail over to running brokers if a single broker is bounced.
>
> If you get a timeout exception, than KafkaStreams dies atm -- we have
> KIP-572 in-flight that will improve the situation by adding one more
> retry layer within KafkaStreams itself. For now, you would need to
> increase the corresponding client timeouts to avoid that the client
> throws a timeout exception.
>
> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
> you could have hit, too.
>
>
> -Matthias
>
> On 11/18/20 7:05 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > We recently ran into an issue where kafka brokers upgraded (i guess it
> was
> > rolling update) for Aiven business plan 4 to plan 8. This involves change
> > to cpu, memory and storage for each broker.
> >
> > Since this should be rolling upgrade, we expected services to survive,
> > however in one service we saw streams ran into Error with below
> exception:
> > Few questions:
> > 1. If a broker goes down, the kafka streams client should handle
> internally
> > and connect to available broker since we have topic with replicas equal
> to
> > no. of brokers. Is this correct?
> > 2. the below error says timeout expired while awaiting InitProducerId..
> > what does this signify and why would this timeout occur when there will
> be
> > other brokers up and running?
> >
> >
> >
> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
> > exception in stream
> >
> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> > stream-thread [analytics-event-filter-StreamThread-1] Failed to
> > rebalance.\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
> > by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
> > task 1_0 due to timeout.\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:206)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
> >
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
> >
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
> >
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
> >
> org.apache.kafka.clients.

KIP to Gracefully handle timeout exception on kafka streams

2020-11-18 Thread Pushkar Deole
Matthias,

We recently ran into an issue where kafka brokers upgraded (i guess it was
rolling update) for Aiven business plan 4 to plan 8. This involves change
to cpu, memory and storage for each broker.

Since this should be rolling upgrade, we expected services to survive,
however in one service we saw streams ran into Error with below exception:
Few questions:
1. If a broker goes down, the kafka streams client should handle internally
and connect to available broker since we have topic with replicas equal to
no. of brokers. Is this correct?
2. the below error says timeout expired while awaiting InitProducerId..
what does this signify and why would this timeout occur when there will be
other brokers up and running?


{"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
exception in stream
processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
stream-thread [analytics-event-filter-StreamThread-1] Failed to
rebalance.\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
by: org.apache.kafka.streams.errors.StreamsException: stream-thread
[analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
task 1_0 due to timeout.\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:206)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)\n\tat
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)\n\tat
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)\n\t...
3 common frames omitted\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6milliseconds while awaiting InitProducerId\n"}


multi-threaded consumer configuration like stream threads?

2020-10-27 Thread Pushkar Deole
Hi,

Is there any configuration in kafka consumer to specify multiple threads
the way it is there in kafka streams?
Essentially, can we have a consumer with multiple threads where the threads
would divide partitions of topic among them?


Stream stopped running due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-19 Thread Pushkar Deole
Hi All,

Recently we moved kafka to azure lab and we are seeing these exceptions
quite often. Strangely only some of the stream apps get this error and stop
working, while other stream app run fine.
Came across this issue on kafka issues list
https://issues.apache.org/jira/browse/KAFKA-8803 and the problem we are
facing matches to great extent along with all symptoms we are experiencing.

Sophie, one of the comments says that you are leading investigation here,
so do you have any suggestions?

We are using kafka broker 2.5.0
kafka client and streams are 2.5.1


Re: Kafka streams parallelism - why not separate stream task per partition per input topic

2020-10-14 Thread Pushkar Deole
Matthias,

Any recommendations?

Also, while doing performance test, I observed that the partitions assigned
to stream threads are changing. Why would this happen when the instances
are not going down?

e.g. i see partitions being changed between stream thread consumers. The
highlighted in bold are partition numbers of input topic, and when i
checked few hours ago, streamthread-12 showed different partition numbers
and even different topics

analytics-event-filter analytics-engagement  *13* 308973
   308980  7
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer
analytics-event-filter analytics-agent-account-state *14 *88053
  88057   4
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer

On Fri, Oct 9, 2020 at 8:20 AM Pushkar Deole  wrote:

> I looked at the task assignment and it looked random for some threads:
> e.g. i have 3 topics 24 partitions each and have 3 instances of
> application. So, each instance assigned 8 partitions per topic, i.e. total
> 24 partitions for 3 topics.
>
> When I set 8 stream threads, I expected each thread to be assigned 1
> partition from each topic, however some of the threads got assigned
> partitions only from 2 of the topics.
> Since topic C is not carrying traffic, those threads that did not get
> assigned partition from topic C got overloaded than others.
>
> Topic A
> Topic
>
> On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax  wrote:
>
>> Well, there are many what-ifs and I am not sure if there is general
>> advice.
>>
>> Maybe a more generic response: do you actually observe a concrete issue
>> with the task assignment that impacts your app measurable? Or might this
>> be a case of premature optimization?
>>
>> -Matthias
>>
>> On 10/6/20 10:13 AM, Pushkar Deole wrote:
>> > So, what do you suggest to address the topic C with lesser traffic?
>> Should
>> > we create a separate StreamBuilder and build a separate topology for
>> topic
>> > C so we can configure number of threads as per our requirement for that
>> > topic?
>> >
>> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax 
>> wrote:
>> >
>> >> The current assignment would be "round robin". Ie, after all tasks are
>> >> created, we just take task-by-task and assign one to the threads
>> >> one-by-one.
>> >>
>> >> Note though, that the assignment algorithm might change at any point,
>> so
>> >> you should not rely on it.
>> >>
>> >> We are also not able to know if one topic has less traffic than others
>> >> and thus must blindly assume (what is of course a simplification) that
>> >> all topics have the same traffic. We only consider the difference
>> >> between stateless and stateful tasks atm.
>> >>
>> >> -Matthias
>> >>
>> >> On 10/6/20 3:57 AM, Pushkar Deole wrote:
>> >>> Matthias,
>> >>>
>> >>> I am just wondering how the tasks will be spread across threads in
>> case I
>> >>> have lesser threads than the number of partitions. Specifically
>> taking my
>> >>> use case, I have 3 inputs topics with 8 partitions each and I can
>> >> configure
>> >>> 12 threads, so how below topics partitions will be distributed among
>> 12
>> >>> threads.
>> >>> Note that topic C is generally idle and carries traffic only
>> sometimes,
>> >> so
>> >>> I would want partitions from topic C to be evenly distributed so all
>> >>> partitions from topic C don't get assigned to only some of the
>> threads.
>> >>>
>> >>> Topic A - 8 partitions
>> >>> Topic B - 8 partitions
>> >>> Topic C - 8 partitions
>> >>>
>> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax 
>> wrote:
>> >>>
>> >>>> That is correct.
>> >>>>
>> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5
>> tasks
>> >>>> for the first sub-topology and 6 tasks for the second sub-topology
>> and
>> >>>> you can run up to 11 threads, each executing one task.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

2020-10-08 Thread Pushkar Deole
I looked at the task assignment and it looked random for some threads: e.g.
i have 3 topics 24 partitions each and have 3 instances of application. So,
each instance assigned 8 partitions per topic, i.e. total 24 partitions for
3 topics.

When I set 8 stream threads, I expected each thread to be assigned 1
partition from each topic, however some of the threads got assigned
partitions only from 2 of the topics.
Since topic C is not carrying traffic, those threads that did not get
assigned partition from topic C got overloaded than others.

Topic A
Topic

On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax  wrote:

> Well, there are many what-ifs and I am not sure if there is general advice.
>
> Maybe a more generic response: do you actually observe a concrete issue
> with the task assignment that impacts your app measurable? Or might this
> be a case of premature optimization?
>
> -Matthias
>
> On 10/6/20 10:13 AM, Pushkar Deole wrote:
> > So, what do you suggest to address the topic C with lesser traffic?
> Should
> > we create a separate StreamBuilder and build a separate topology for
> topic
> > C so we can configure number of threads as per our requirement for that
> > topic?
> >
> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax 
> wrote:
> >
> >> The current assignment would be "round robin". Ie, after all tasks are
> >> created, we just take task-by-task and assign one to the threads
> >> one-by-one.
> >>
> >> Note though, that the assignment algorithm might change at any point, so
> >> you should not rely on it.
> >>
> >> We are also not able to know if one topic has less traffic than others
> >> and thus must blindly assume (what is of course a simplification) that
> >> all topics have the same traffic. We only consider the difference
> >> between stateless and stateful tasks atm.
> >>
> >> -Matthias
> >>
> >> On 10/6/20 3:57 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> I am just wondering how the tasks will be spread across threads in
> case I
> >>> have lesser threads than the number of partitions. Specifically taking
> my
> >>> use case, I have 3 inputs topics with 8 partitions each and I can
> >> configure
> >>> 12 threads, so how below topics partitions will be distributed among 12
> >>> threads.
> >>> Note that topic C is generally idle and carries traffic only sometimes,
> >> so
> >>> I would want partitions from topic C to be evenly distributed so all
> >>> partitions from topic C don't get assigned to only some of the threads.
> >>>
> >>> Topic A - 8 partitions
> >>> Topic B - 8 partitions
> >>> Topic C - 8 partitions
> >>>
> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax 
> wrote:
> >>>
> >>>> That is correct.
> >>>>
> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5
> tasks
> >>>> for the first sub-topology and 6 tasks for the second sub-topology and
> >>>> you can run up to 11 threads, each executing one task.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> >>>>> Matthias,
> >>>>>
> >>>>> Let's say we have independent sub topologies like: in this case, will
> >> the
> >>>>> streams create tasks equal to the total number of partitions from
> >> topicA
> >>>>> and topicB, and can we assign stream thread count that is sum of the
> >>>>> partition of the two topics?
> >>>>>
> >>>>> builder.stream("topicA").filter().to();
> >>>>> builder.stream("topicB").filter().to();
> >>>>>
> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax 
> >> wrote:
> >>>>>
> >>>>>> Well, it depends on your program.
> >>>>>>
> >>>>>> The reason for the current task creating strategy are joins: If you
> >> have
> >>>>>> two input topic that you want to join, the join happens on a
> >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and
> thus
> >>>>>> both partitions must be assigned to the same task (to get
> >> co-partitioned
> >>>>>> data processed together).
> >>>>>>
>

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

2020-10-06 Thread Pushkar Deole
So, what do you suggest to address the topic C with lesser traffic? Should
we create a separate StreamBuilder and build a separate topology for topic
C so we can configure number of threads as per our requirement for that
topic?

On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax  wrote:

> The current assignment would be "round robin". Ie, after all tasks are
> created, we just take task-by-task and assign one to the threads
> one-by-one.
>
> Note though, that the assignment algorithm might change at any point, so
> you should not rely on it.
>
> We are also not able to know if one topic has less traffic than others
> and thus must blindly assume (what is of course a simplification) that
> all topics have the same traffic. We only consider the difference
> between stateless and stateful tasks atm.
>
> -Matthias
>
> On 10/6/20 3:57 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > I am just wondering how the tasks will be spread across threads in case I
> > have lesser threads than the number of partitions. Specifically taking my
> > use case, I have 3 inputs topics with 8 partitions each and I can
> configure
> > 12 threads, so how below topics partitions will be distributed among 12
> > threads.
> > Note that topic C is generally idle and carries traffic only sometimes,
> so
> > I would want partitions from topic C to be evenly distributed so all
> > partitions from topic C don't get assigned to only some of the threads.
> >
> > Topic A - 8 partitions
> > Topic B - 8 partitions
> > Topic C - 8 partitions
> >
> > On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax  wrote:
> >
> >> That is correct.
> >>
> >> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
> >> for the first sub-topology and 6 tasks for the second sub-topology and
> >> you can run up to 11 threads, each executing one task.
> >>
> >>
> >> -Matthias
> >>
> >> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> Let's say we have independent sub topologies like: in this case, will
> the
> >>> streams create tasks equal to the total number of partitions from
> topicA
> >>> and topicB, and can we assign stream thread count that is sum of the
> >>> partition of the two topics?
> >>>
> >>> builder.stream("topicA").filter().to();
> >>> builder.stream("topicB").filter().to();
> >>>
> >>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax 
> wrote:
> >>>
> >>>> Well, it depends on your program.
> >>>>
> >>>> The reason for the current task creating strategy are joins: If you
> have
> >>>> two input topic that you want to join, the join happens on a
> >>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
> >>>> both partitions must be assigned to the same task (to get
> co-partitioned
> >>>> data processed together).
> >>>>
> >>>> Note, that the following program would create independent tasks as it
> >>>> consist of two independent sub-topologies:
> >>>>
> >>>> builder.stream("topicA").filter().to();
> >>>> builder.stream("topicB").filter().to();
> >>>>
> >>>> However, the next program would be one sub-topology and thus we apply
> >>>> the "join" rule (as we don't really know if you actually execute a
> join
> >>>> or not when we create tasks):
> >>>>
> >>>> KStream s1 = builder.stream("topicA");
> >>>> builser.stream("topicB").merge(s1).filter().to();
> >>>>
> >>>>
> >>>> Having said that, I agree that it would be a nice improvement to be
> more
> >>>> clever about it. However, it not easy to do. There is actually a
> related
> >>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
> >>>>
> >>>>
> >>>> Hope this helps.
> >>>>   -Matthias
> >>>>
> >>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I came across articles where it is explained how parallelism is
> handled
> >>>> in
> >>>>> kafka streams. This is what I collected:
> >>>>> When the streams application is reading from multiple topics, the
> topic
> >>>>> with maximum number of partitions is considered for instantiating
> >> stream
> >>>>> tasks so 1 task is instantiated per partition.
> >>>>> Now, if the stream task is reading from multiple topics then the
> >>>> partitions
> >>>>> of multiple topics are shared among those stream tasks.
> >>>>>
> >>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
> >>>>> instantiated and assigned to 5 stream threads where each task is
> >>>> assigned 1
> >>>>> partition from Topic A and Topic B.
> >>>>>
> >>>>> The question here is : if I would want 1 task to be created for each
> >>>>> partition from the input topic then is this possible? e.g. I would
> want
> >>>> to
> >>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads
> to
> >>>>> handle those. How can this be achieved?
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>


Re: Kafka streams parallelism - why not separate stream task per partition per input topic

2020-10-06 Thread Pushkar Deole
Matthias,

I am just wondering how the tasks will be spread across threads in case I
have lesser threads than the number of partitions. Specifically taking my
use case, I have 3 inputs topics with 8 partitions each and I can configure
12 threads, so how below topics partitions will be distributed among 12
threads.
Note that topic C is generally idle and carries traffic only sometimes, so
I would want partitions from topic C to be evenly distributed so all
partitions from topic C don't get assigned to only some of the threads.

Topic A - 8 partitions
Topic B - 8 partitions
Topic C - 8 partitions

On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax  wrote:

> That is correct.
>
> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
> for the first sub-topology and 6 tasks for the second sub-topology and
> you can run up to 11 threads, each executing one task.
>
>
> -Matthias
>
> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > Let's say we have independent sub topologies like: in this case, will the
> > streams create tasks equal to the total number of partitions from topicA
> > and topicB, and can we assign stream thread count that is sum of the
> > partition of the two topics?
> >
> > builder.stream("topicA").filter().to();
> > builder.stream("topicB").filter().to();
> >
> > On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax  wrote:
> >
> >> Well, it depends on your program.
> >>
> >> The reason for the current task creating strategy are joins: If you have
> >> two input topic that you want to join, the join happens on a
> >> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
> >> both partitions must be assigned to the same task (to get co-partitioned
> >> data processed together).
> >>
> >> Note, that the following program would create independent tasks as it
> >> consist of two independent sub-topologies:
> >>
> >> builder.stream("topicA").filter().to();
> >> builder.stream("topicB").filter().to();
> >>
> >> However, the next program would be one sub-topology and thus we apply
> >> the "join" rule (as we don't really know if you actually execute a join
> >> or not when we create tasks):
> >>
> >> KStream s1 = builder.stream("topicA");
> >> builser.stream("topicB").merge(s1).filter().to();
> >>
> >>
> >> Having said that, I agree that it would be a nice improvement to be more
> >> clever about it. However, it not easy to do. There is actually a related
> >> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
> >>
> >>
> >> Hope this helps.
> >>   -Matthias
> >>
> >> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> >>> Hi,
> >>>
> >>> I came across articles where it is explained how parallelism is handled
> >> in
> >>> kafka streams. This is what I collected:
> >>> When the streams application is reading from multiple topics, the topic
> >>> with maximum number of partitions is considered for instantiating
> stream
> >>> tasks so 1 task is instantiated per partition.
> >>> Now, if the stream task is reading from multiple topics then the
> >> partitions
> >>> of multiple topics are shared among those stream tasks.
> >>>
> >>> For example, Topic A and B has 5 partitions each then 5 tasks are
> >>> instantiated and assigned to 5 stream threads where each task is
> >> assigned 1
> >>> partition from Topic A and Topic B.
> >>>
> >>> The question here is : if I would want 1 task to be created for each
> >>> partition from the input topic then is this possible? e.g. I would want
> >> to
> >>> have 5 tasks for topic A and 5 for B and then would want 10 threads to
> >>> handle those. How can this be achieved?
> >>>
> >>
> >>
> >
>
>


started getting TopologyException: Invalid topology after moving to streams-2.5.1

2020-10-05 Thread Pushkar Deole
Hi All,

After moving to kafka-streams-2.5.1 version, one of our services started
failing with below exception. Any idea what this is about and why it was
passing with 2.5.0? Any changes made in 2.5.1 that is breaking this?

Exception in thread "main"
org.springframework.context.ApplicationContextException: Failed to start
bean 'defaultKafkaStreamsBuilder'; nested exception is
org.springframework.kafka.KafkaException: Could not start stream: ; nested
exception is org.apache.kafka.streams.errors.TopologyException: Invalid
topology: Topology has no stream threads and no global threads, must
subscribe to at least one source topic or global table.

at org.springframework.context.support.DefaultLifecycleProcessor.doStart(
DefaultLifecycleProcessor.java:185
)

at org.springframework.context.support.DefaultLifecycleProcessor.access$200(
DefaultLifecycleProcessor.java:53
)

at org.springframework.context.support
.DefaultLifecycleProcessor$LifecycleGroup.start(
DefaultLifecycleProcessor.java:360
)

at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(
DefaultLifecycleProcessor.java:158
)

at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(
DefaultLifecycleProcessor.java:122
)

at org.springframework.context.support
.AbstractApplicationContext.finishRefresh(
AbstractApplicationContext.java:895
)

at org.springframework.context.support.AbstractApplicationContext.refresh(
AbstractApplicationContext.java:554
)

at
org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(
ServletWebServerApplicationContext.java:143
)

at org.springframework.boot.SpringApplication.refresh(
SpringApplication.java:758 )

at org.springframework.boot.SpringApplication.refresh(
SpringApplication.java:750 )

at org.springframework.boot.SpringApplication.refreshContext(
SpringApplication.java:397 )

at org.springframework.boot.SpringApplication.run
(
SpringApplication.java:315 )

at org.springframework.boot.SpringApplication.run
(
SpringApplication.java:1237 )

at org.springframework.boot.SpringApplication.run
(
SpringApplication.java:1226 )

at com.avaya.analytics.AnalyticsStreamsDataPublisherApplication.main(
AnalyticsStreamsDataPublisherApplication.java:31
)

Caused by: org.springframework.kafka.KafkaException: Could not start
stream: ; nested exception is
org.apache.kafka.streams.errors.TopologyException: Invalid topology:
Topology has no stream threads and no global threads, must subscribe to at
least one source topic or global table.

at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(
StreamsBuilderFactoryBean.java:326
)

at org.springframework.context.support.DefaultLifecycleProcessor.doStart(
DefaultLifecycleProcessor.java:182
)

... 14 more

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid
topology: Topology has no stream threads and no global threads, must
subscribe to at least one source topic or global table.

at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:728
)

at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:587
)

at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(
StreamsBuilderFactoryBean.java:311
)

... 15 more


Re: Kafka streams - how to handle application level exception in event processing

2020-09-23 Thread Pushkar Deole
Thanks Bruno.. yeah, I think I could figure it out... For dependencies such
as database, for which all the events will be blocked, we are planning to
put a retry mechanism, so processing will wait until the database
connection is backup. If the problem is with the incoming event, like bad
format etc. then we can skip that event and log it or add it to a dead
letter queue topic.

On Wed, Sep 23, 2020 at 4:04 PM Bruno Cadonna  wrote:

> Hi Pushkar,
>
> if you do not want to lose any event, you should cache the events
> somewhere (e.g. a state store) in case there is an issue with an
> external system you connect to (e.g. database issue). If the order of
> the event is important, you must ensure that the events in your cache
> are processed in the order they where written to the cache (i.e.
> first-in first-out).
>
> Maybe you can find some good hints in the links Gilles posted.
>
> Best,
> Bruno
>
> On 22.09.20 10:51, Pushkar Deole wrote:
> > Thank you Gilles..will take a look..
> >
> > Bruno, thanks for your elaborate explanation as well... however it
> > basically exposes my application to certain issues..
> >
> > e.g. the application deals with agent states of a call center, and where
> > the order of processing is important. So when agent is logged in then he
> > keeps rotating between Ready, and Not ready states and at the end of the
> > day he becomes Logged out... If while processing the Ready event, there
> is
> > some temporary issue with database/network and the event processing gets
> > exception, application does few retries but no luck.
> > As per kafka polling, it will go ahead and poll next record from
> partition
> > for the same agent (since agent id being key) and it will process logged
> > out event. So, this mean i lost the Ready event in between due to the
> > database issue? Even if i store this event somewhere for processing it
> > later, processing the Ready event after logged out, doesn't make sense
> > since order of state is important? Is my u
> >
> > On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart
> >  wrote:
> >
> >> Hi Pushkar,
> >>
> >> Uber has written about how they deal with failures and reprocessing
> here,
> >> it might help you achieve what you describe:
> >> https://eng.uber.com/reliable-reprocessing/.
> >>
> >> Unfortunately, there isn't much written documentation about those
> patterns.
> >> There's also a good talk from Confluent's Antony Stubbs on how you can
> do
> >> certain things with the Processor API that you can't do with the Kafka
> >> Streams DSL:
> >>
> >>
> https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api
> >> .
> >>
> >> Gilles Philippart
> >> Funding Circle Engineering
> >>
> >> On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna  wrote:
> >>
> >>> Hi Pushkar,
> >>>
> >>> I think there is a misunderstanding. If a consumer polls from a
> >>> partition, it will always poll the next event independently whether the
> >>> offset was committed or not. Committed offsets are used for fault
> >>> tolerance, i.e., when a consumer crashes, the consumer that takes over
> >>> the work of the crashed consumer will start polling record from the
> >>> offset the crashed consumer committed last. This is not only true for
> >>> Kafka Streams, but for all applications that use a Kafka consumer with
> >>> subscription.
> >>>
> >>> To be clear, my proposal is not a workaround. This is one approach to
> >>> solve your problem in Kafka Streams. You could have a look into
> >>> stream-stream joins if you can use a stream instead of a global table.
> >>> Another approach would be to use a plain Kafka consumer instead of
> Kafka
> >>> Stream with which you would have a more fine-grained control about
> polls
> >>> and commits. In any case, be aware that blocking processing on an event
> >>> indefinitely may result in your lag and/or your state growing
> >>> indefinitely.
> >>>
> >>> If you think there is something missing in Kafka Streams, you are very
> >>> welcome to search through the tickets in
> >>> https://issues.apache.org/jira/projects/KAFKA/issues and comment on
> >>> tickets that would solve your issue or create a new one if you cannot
> >>> find any.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 22.09.20 05:09,

Re: Kafka streams - how to handle application level exception in event processing

2020-09-22 Thread Pushkar Deole
Thank you Gilles..will take a look..

Bruno, thanks for your elaborate explanation as well... however it
basically exposes my application to certain issues..

e.g. the application deals with agent states of a call center, and where
the order of processing is important. So when agent is logged in then he
keeps rotating between Ready, and Not ready states and at the end of the
day he becomes Logged out... If while processing the Ready event, there is
some temporary issue with database/network and the event processing gets
exception, application does few retries but no luck.
As per kafka polling, it will go ahead and poll next record from partition
for the same agent (since agent id being key) and it will process logged
out event. So, this mean i lost the Ready event in between due to the
database issue? Even if i store this event somewhere for processing it
later, processing the Ready event after logged out, doesn't make sense
since order of state is important? Is my u

On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart
 wrote:

> Hi Pushkar,
>
> Uber has written about how they deal with failures and reprocessing here,
> it might help you achieve what you describe:
> https://eng.uber.com/reliable-reprocessing/.
>
> Unfortunately, there isn't much written documentation about those patterns.
> There's also a good talk from Confluent's Antony Stubbs on how you can do
> certain things with the Processor API that you can't do with the Kafka
> Streams DSL:
>
> https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api
> .
>
> Gilles Philippart
> Funding Circle Engineering
>
> On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna  wrote:
>
> > Hi Pushkar,
> >
> > I think there is a misunderstanding. If a consumer polls from a
> > partition, it will always poll the next event independently whether the
> > offset was committed or not. Committed offsets are used for fault
> > tolerance, i.e., when a consumer crashes, the consumer that takes over
> > the work of the crashed consumer will start polling record from the
> > offset the crashed consumer committed last. This is not only true for
> > Kafka Streams, but for all applications that use a Kafka consumer with
> > subscription.
> >
> > To be clear, my proposal is not a workaround. This is one approach to
> > solve your problem in Kafka Streams. You could have a look into
> > stream-stream joins if you can use a stream instead of a global table.
> > Another approach would be to use a plain Kafka consumer instead of Kafka
> > Stream with which you would have a more fine-grained control about polls
> > and commits. In any case, be aware that blocking processing on an event
> > indefinitely may result in your lag and/or your state growing
> > indefinitely.
> >
> > If you think there is something missing in Kafka Streams, you are very
> > welcome to search through the tickets in
> > https://issues.apache.org/jira/projects/KAFKA/issues and comment on
> > tickets that would solve your issue or create a new one if you cannot
> > find any.
> >
> > Best,
> > Bruno
> >
> > On 22.09.20 05:09, Pushkar Deole wrote:
> > > Bruno,
> > >
> > > So, essentially, we are just waiting on the processing of first event
> > that
> > > got an error before going ahead on to the next one.
> > >
> > > Second, if application handles storing the events in state store for
> > retry,
> > > Kafka stream would essentially commit the offset of those events, so
> next
> > > event will be polled by consumer, correct?
> > >
> > > Instead of this work around, is there any provision in kafka streams
> for
> > > this scenario? e.g. in case application registers application level
> > > exceptions then kafka streams will take care of it and do all this
> > > internally, and will not commit the offset of that event and hence will
> > > keep polling the same event again?
> > > Since this is a common scenario, using a particular configuration for
> > users
> > > can achieve this in Kafka streams internally?
> > >
> > >
> > > On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi Pushkar,
> > >>
> > >> If you want to keep the order, you could still use the state store I
> > >> suggested in my previous e-mail and implement a queue on top of it.
> For
> > >> that you need to put the events into the store with a key that
> > >> represents the arrival order of the events. Each time a record is
> > >> received from the input topic, the events

Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Pushkar Deole
Bruno,

So, essentially, we are just waiting on the processing of first event that
got an error before going ahead on to the next one.

Second, if application handles storing the events in state store for retry,
Kafka stream would essentially commit the offset of those events, so next
event will be polled by consumer, correct?

Instead of this work around, is there any provision in kafka streams for
this scenario? e.g. in case application registers application level
exceptions then kafka streams will take care of it and do all this
internally, and will not commit the offset of that event and hence will
keep polling the same event again?
Since this is a common scenario, using a particular configuration for users
can achieve this in Kafka streams internally?


On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna  wrote:

> Hi Pushkar,
>
> If you want to keep the order, you could still use the state store I
> suggested in my previous e-mail and implement a queue on top of it. For
> that you need to put the events into the store with a key that
> represents the arrival order of the events. Each time a record is
> received from the input topic, the events are read in arrival order from
> the state store and the data in the global table is probed. If an event
> matches data from the global table the event is removed from the state
> store and emitted. If an event does not match data from the global table
> the processing is stopped and nothing is emitted.
>
> Best,
> Bruno
>
> On 21.09.20 14:21, Pushkar Deole wrote:
> > Bruno,
> >
> > 1. the loading of topic mapped to GlobalKTable is done by some other
> > service/application so when my application starts up, it will just sync a
> > GlobalKTable against that topic and if that other service/application is
> > still starting up then it may not have loaded that data on that topic and
> > that's the reason it is not available to my application through the
> > GlobalKTable.
> >
> > 2. I don't want out of order processing to happen, that's the reason I
> want
> > my streams application to keep trying same event until the other
> > application starts up and required data becomes available in globalKtable
> >
> >
> > On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna 
> wrote:
> >
> >> Thank you for clarifying! Now, I think I understand.
> >>
> >> You could put events for which required data in the global table is not
> >> available into a state store and each time an event from the input topic
> >> is processed, you could lookup all events in your state store and see if
> >> required data is now available for them.
> >>
> >> However, be aware that this can mix up the original order of the events
> >> in your input topic if required data of later events is available before
> >> required data of earlier events. Furthermore, you need to consider the
> >> case when you have a huge amount of events in the state store and
> >> suddenly all required data in the global table is available, because
> >> processing all those events at once might lead to exceeding
> >> max.poll.interval.ms and the stream thread might be kicked out of the
> >> consumer group. To solve that, you may want to limit the number of
> >> events processed at once. You also need to avoid the state store growing
> >> indefinitely if required data in the global table is not available for a
> >> long time or not available at all. Maybe all this caveats do not apply
> >> to your use case.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On 21.09.20 13:45, Pushkar Deole wrote:
> >>> Say the application level exception is named as :
> >>> MeasureDefinitionNotAvaialbleException
> >>>
> >>> What I am trying to achieve is: in above case when the event processing
> >>> fails due to required data not available, the streams should not
> proceed
> >> on
> >>> to next event, however it should wait for the processing of current
> event
> >>> to complete. If I just catch the MeasureDefinitionNotAvaialbleException
> >> in
> >>> processor and log it then the stream will proceed onto next event
> >>> considering the current event processing got successful right?
> >>>
> >>> On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole 
> >> wrote:
> >>>
> >>>> It is not a kafka streams error, it is an application level error e.g.
> >>>> say, some data required for processing an input event is not available
> >> in
> >>>> the GlobalKTable since it is not yet sync

kafka schema registry - some queries and questions

2020-09-21 Thread Pushkar Deole
Hi All,

Wanted to understand a bit more on the schema registry provided by
confluent.
Following are the queries:
1. Is the schema registry provided by confluent over the top of Apache
Kafka?
2. If a managed kafka service is used in cloud e.g. say Aiven Kafka, then
does the schema registry implementation is different for different vendors
i.e. will Aiven has their own implementation or confluent has open sourced
the schema registry implementation?
3. Does the confluent Avro client libraries will work with schema registry
of managed services from other vendors like Aiven?


Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Pushkar Deole
Bruno,

1. the loading of topic mapped to GlobalKTable is done by some other
service/application so when my application starts up, it will just sync a
GlobalKTable against that topic and if that other service/application is
still starting up then it may not have loaded that data on that topic and
that's the reason it is not available to my application through the
GlobalKTable.

2. I don't want out of order processing to happen, that's the reason I want
my streams application to keep trying same event until the other
application starts up and required data becomes available in globalKtable


On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna  wrote:

> Thank you for clarifying! Now, I think I understand.
>
> You could put events for which required data in the global table is not
> available into a state store and each time an event from the input topic
> is processed, you could lookup all events in your state store and see if
> required data is now available for them.
>
> However, be aware that this can mix up the original order of the events
> in your input topic if required data of later events is available before
> required data of earlier events. Furthermore, you need to consider the
> case when you have a huge amount of events in the state store and
> suddenly all required data in the global table is available, because
> processing all those events at once might lead to exceeding
> max.poll.interval.ms and the stream thread might be kicked out of the
> consumer group. To solve that, you may want to limit the number of
> events processed at once. You also need to avoid the state store growing
> indefinitely if required data in the global table is not available for a
> long time or not available at all. Maybe all this caveats do not apply
> to your use case.
>
> Best,
> Bruno
>
>
> On 21.09.20 13:45, Pushkar Deole wrote:
> > Say the application level exception is named as :
> > MeasureDefinitionNotAvaialbleException
> >
> > What I am trying to achieve is: in above case when the event processing
> > fails due to required data not available, the streams should not proceed
> on
> > to next event, however it should wait for the processing of current event
> > to complete. If I just catch the MeasureDefinitionNotAvaialbleException
> in
> > processor and log it then the stream will proceed onto next event
> > considering the current event processing got successful right?
> >
> > On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole 
> wrote:
> >
> >> It is not a kafka streams error, it is an application level error e.g.
> >> say, some data required for processing an input event is not available
> in
> >> the GlobalKTable since it is not yet synced with the global topic
> >>
> >> On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna 
> wrote:
> >>
> >>> Hi Pushkar,
> >>>
> >>> Is the error you are talking about, one that is thrown by Kafka Streams
> >>> or by your application? If it is thrown by Kafka Streams, could you
> >>> please post the error?
> >>>
> >>> I do not completely understand what you are trying to achieve, but
> maybe
> >>> max.task.idle.ms [1] is the configuration you are looking for.
> >>>
> >>> I can assure you that enable.auto.commit is false in Kafka Streams.
> What
> >>> you probably mean is that Kafka Streams periodically commits the
> >>> offsets. The commit interval can be controlled with commit.interval.ms
> >>> [2].
> >>>
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>>
> >>> [1] https://kafka.apache.org/documentation/#max.task.idle.ms
> >>> [2] https://kafka.apache.org/documentation/#commit.interval.ms
> >>>
> >>> On 21.09.20 12:38, Pushkar Deole wrote:
> >>>> Hi,
> >>>>
> >>>> I would like to know how to handle following scenarios while
> processing
> >>>> events in a kafka streams application:
> >>>>
> >>>> 1. the streams application needs data from a globalKtable which loads
> it
> >>>> from a topic that is populated by some other service/application. So,
> if
> >>>> the streams application starts getting events from input source topic
> >>>> however it doesn't find required data in GlobalKTable since that other
> >>>> application/service hasn't yet loaded that data then the Kafka streams
> >>>> application gets error while processing the event and application
> >>> handles
> >>>> the exception by logging  an error and it goes onto processing other
> >>>> events. Since auto.commit is true, the polling will go on fetching
> next
> >>>> batch and probably it will set the offset of previous batch, causing
> >>> loss
> >>>> of events that had an exception while processing.
> >>>>
> >>>> I want to halt the processing here if an error occurs while processing
> >>> the
> >>>> event, so instead of going on to the next event, the processing should
> >>> keep
> >>>> trying previous event until application level error is resolved. How
> >>> can I
> >>>> achieve this?
> >>>>
> >>>
> >>
> >
>


Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Pushkar Deole
Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException

What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not proceed on
to next event, however it should wait for the processing of current event
to complete. If I just catch the MeasureDefinitionNotAvaialbleException in
processor and log it then the stream will proceed onto next event
considering the current event processing got successful right?

On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole  wrote:

> It is not a kafka streams error, it is an application level error e.g.
> say, some data required for processing an input event is not available in
> the GlobalKTable since it is not yet synced with the global topic
>
> On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna  wrote:
>
>> Hi Pushkar,
>>
>> Is the error you are talking about, one that is thrown by Kafka Streams
>> or by your application? If it is thrown by Kafka Streams, could you
>> please post the error?
>>
>> I do not completely understand what you are trying to achieve, but maybe
>> max.task.idle.ms [1] is the configuration you are looking for.
>>
>> I can assure you that enable.auto.commit is false in Kafka Streams. What
>> you probably mean is that Kafka Streams periodically commits the
>> offsets. The commit interval can be controlled with commit.interval.ms
>> [2].
>>
>>
>> Best,
>> Bruno
>>
>>
>> [1] https://kafka.apache.org/documentation/#max.task.idle.ms
>> [2] https://kafka.apache.org/documentation/#commit.interval.ms
>>
>> On 21.09.20 12:38, Pushkar Deole wrote:
>> > Hi,
>> >
>> > I would like to know how to handle following scenarios while processing
>> > events in a kafka streams application:
>> >
>> > 1. the streams application needs data from a globalKtable which loads it
>> > from a topic that is populated by some other service/application. So, if
>> > the streams application starts getting events from input source topic
>> > however it doesn't find required data in GlobalKTable since that other
>> > application/service hasn't yet loaded that data then the Kafka streams
>> > application gets error while processing the event and application
>> handles
>> > the exception by logging  an error and it goes onto processing other
>> > events. Since auto.commit is true, the polling will go on fetching next
>> > batch and probably it will set the offset of previous batch, causing
>> loss
>> > of events that had an exception while processing.
>> >
>> > I want to halt the processing here if an error occurs while processing
>> the
>> > event, so instead of going on to the next event, the processing should
>> keep
>> > trying previous event until application level error is resolved. How
>> can I
>> > achieve this?
>> >
>>
>


Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Pushkar Deole
It is not a kafka streams error, it is an application level error e.g. say,
some data required for processing an input event is not available in the
GlobalKTable since it is not yet synced with the global topic

On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna  wrote:

> Hi Pushkar,
>
> Is the error you are talking about, one that is thrown by Kafka Streams
> or by your application? If it is thrown by Kafka Streams, could you
> please post the error?
>
> I do not completely understand what you are trying to achieve, but maybe
> max.task.idle.ms [1] is the configuration you are looking for.
>
> I can assure you that enable.auto.commit is false in Kafka Streams. What
> you probably mean is that Kafka Streams periodically commits the
> offsets. The commit interval can be controlled with commit.interval.ms
> [2].
>
>
> Best,
> Bruno
>
>
> [1] https://kafka.apache.org/documentation/#max.task.idle.ms
> [2] https://kafka.apache.org/documentation/#commit.interval.ms
>
> On 21.09.20 12:38, Pushkar Deole wrote:
> > Hi,
> >
> > I would like to know how to handle following scenarios while processing
> > events in a kafka streams application:
> >
> > 1. the streams application needs data from a globalKtable which loads it
> > from a topic that is populated by some other service/application. So, if
> > the streams application starts getting events from input source topic
> > however it doesn't find required data in GlobalKTable since that other
> > application/service hasn't yet loaded that data then the Kafka streams
> > application gets error while processing the event and application handles
> > the exception by logging  an error and it goes onto processing other
> > events. Since auto.commit is true, the polling will go on fetching next
> > batch and probably it will set the offset of previous batch, causing loss
> > of events that had an exception while processing.
> >
> > I want to halt the processing here if an error occurs while processing
> the
> > event, so instead of going on to the next event, the processing should
> keep
> > trying previous event until application level error is resolved. How can
> I
> > achieve this?
> >
>


Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Pushkar Deole
Hi,

I would like to know how to handle following scenarios while processing
events in a kafka streams application:

1. the streams application needs data from a globalKtable which loads it
from a topic that is populated by some other service/application. So, if
the streams application starts getting events from input source topic
however it doesn't find required data in GlobalKTable since that other
application/service hasn't yet loaded that data then the Kafka streams
application gets error while processing the event and application handles
the exception by logging  an error and it goes onto processing other
events. Since auto.commit is true, the polling will go on fetching next
batch and probably it will set the offset of previous batch, causing loss
of events that had an exception while processing.

I want to halt the processing here if an error occurs while processing the
event, so instead of going on to the next event, the processing should keep
trying previous event until application level error is resolved. How can I
achieve this?


Re: Kafka stream error - Consumer is not subscribed to any topics or assigned any partitions

2020-09-14 Thread Pushkar Deole
Sophie, one more question: will just upgrading kafka-streams jar to 2.5.1
will work or we need to other jars also to be upgraded to 2.5.1 e.g.
kafka-clients etc. ?

On Mon, Sep 14, 2020 at 7:16 PM Pushkar Deole  wrote:

> Thanks Sophie... if we are just creating a global state store
> (GlobalKTable for instance) from a topic, then that is what you are calling
> as global-only topology. In our application that is what we are doing and
> there is no source topic for the stream to process data from, i mean there
> is however it is done through a consumer-producer kind of design and not
> through stream topology.
>
> On Fri, Sep 11, 2020 at 10:58 PM Sophie Blee-Goldman 
> wrote:
>
>> You should upgrade to 2.5.1, it contains a fix for this.
>>
>> Technically the "fix" is just to automatically set the num.stream.threads
>> to 0
>> when a global-only topology is detected, so setting this manually would
>> accomplish the same thing. But the fix also includes a tweak of the
>> KafkaStreams state machine to make sure it reaches the RUNNING state
>> even with no stream threads. So if you use a state listener, you'll want
>> to
>> use 2.5.1
>>
>> It's always a good idea to upgrade when a new bugfix version is released
>> anyway
>>
>> On Fri, Sep 11, 2020 at 5:15 AM Pushkar Deole 
>> wrote:
>>
>> > Hi All,
>> >
>> > I upgraded from Kafka streams 2.4 to 2.5.0 and one of the applications
>> > suddenly stopped working with the error message:
>> >
>> > Exception in thread
>> > "DsiApplication-0fcde033-dab2-431c-9d82-76e85fcb4c91-StreamThread-1"
>> > java.lang.IllegalStateException: Consumer is not subscribed to any
>> topics
>> > or assigned any partitions
>> > at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
>> > at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>> > at
>> >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>> > at
>> >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>> > at
>> >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>> > at
>> >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>> >
>> > This application uses streams just to create a global state store from a
>> > topic in order to create a global state store as a cache for static data
>> > across application instances and the stream doesn't consume from any
>> input
>> > topic. Came across following thread on stackoverflow
>> >
>> >
>> https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic
>> >
>> > Matthias, I see you have answered some queries there, so would like to
>> > confirm if setting num.stream.threads to 0 will solve this issue?
>> >
>>
>


Re: Kafka stream error - Consumer is not subscribed to any topics or assigned any partitions

2020-09-14 Thread Pushkar Deole
Thanks Sophie... if we are just creating a global state store (GlobalKTable
for instance) from a topic, then that is what you are calling as
global-only topology. In our application that is what we are doing and
there is no source topic for the stream to process data from, i mean there
is however it is done through a consumer-producer kind of design and not
through stream topology.

On Fri, Sep 11, 2020 at 10:58 PM Sophie Blee-Goldman 
wrote:

> You should upgrade to 2.5.1, it contains a fix for this.
>
> Technically the "fix" is just to automatically set the num.stream.threads
> to 0
> when a global-only topology is detected, so setting this manually would
> accomplish the same thing. But the fix also includes a tweak of the
> KafkaStreams state machine to make sure it reaches the RUNNING state
> even with no stream threads. So if you use a state listener, you'll want to
> use 2.5.1
>
> It's always a good idea to upgrade when a new bugfix version is released
> anyway
>
> On Fri, Sep 11, 2020 at 5:15 AM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > I upgraded from Kafka streams 2.4 to 2.5.0 and one of the applications
> > suddenly stopped working with the error message:
> >
> > Exception in thread
> > "DsiApplication-0fcde033-dab2-431c-9d82-76e85fcb4c91-StreamThread-1"
> > java.lang.IllegalStateException: Consumer is not subscribed to any topics
> > or assigned any partitions
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> >
> > This application uses streams just to create a global state store from a
> > topic in order to create a global state store as a cache for static data
> > across application instances and the stream doesn't consume from any
> input
> > topic. Came across following thread on stackoverflow
> >
> >
> https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic
> >
> > Matthias, I see you have answered some queries there, so would like to
> > confirm if setting num.stream.threads to 0 will solve this issue?
> >
>


Kafka stream error - Consumer is not subscribed to any topics or assigned any partitions

2020-09-11 Thread Pushkar Deole
Hi All,

I upgraded from Kafka streams 2.4 to 2.5.0 and one of the applications
suddenly stopped working with the error message:

Exception in thread
"DsiApplication-0fcde033-dab2-431c-9d82-76e85fcb4c91-StreamThread-1"
java.lang.IllegalStateException: Consumer is not subscribed to any topics
or assigned any partitions
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

This application uses streams just to create a global state store from a
topic in order to create a global state store as a cache for static data
across application instances and the stream doesn't consume from any input
topic. Came across following thread on stackoverflow
https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic

Matthias, I see you have answered some queries there, so would like to
confirm if setting num.stream.threads to 0 will solve this issue?


Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-09-09 Thread Pushkar Deole
Matthias,

Is there any work around after the stream goes into error because of above
issue like attaching a StateListener on the StreamBuilder and restart the
stream in case of ERROR state?
Right now, we need to start the pod that hosts the application which won't
be feasible when the application goes into production.

On Thu, Sep 10, 2020 at 2:20 AM Matthias J. Sax  wrote:

> Well, it's for sure EOS related, but it seems to be a different root cause.
>
> I am not aware of any related bug.
>
> -Matthias
>
>
> On 9/9/20 4:29 AM, Pushkar Deole wrote:
> > Hi Matthias,
> >
> > We are using confluent kafka and upgraded to confluent version 5.5.0
> which
> > I believe maps to 2.5.0 of apache kafka. We tested on few labs by keeping
> > the solution idle for few days and didn't observe the issue.
> >
> > However on one of the labs we observed issue again recently, this is the
> > exception: unfortunately, don't have complete stack trace.
> > Anyway, do you think it is same exception as above or is it different?
> and
> > whether this is also a kafka server issue that is being reported already?
> >
> >
> {"@timestamp":"2020-08-12T09:17:35.270+00:00","@version":"1","message":"Unexpected
> > exception in stream
> >
> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-Str
> >
> eamThread-1","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.common.errors.InvalidPidMappingException:
> > The producer attempted to use a producer id which is not currently
> assigned
> > to its transactiona l id.\n"}
> >
> > On Sat, May 9, 2020 at 12:49 AM Matthias J. Sax 
> wrote:
> >
> >>>> So does this issue relate to transactions which are used only when
> >>>> exactly_once guarantee is set?
> >>
> >> Correct.
> >>
> >> On 5/8/20 6:28 AM, Pushkar Deole wrote:
> >>> Hello Matthias,
> >>>
> >>> By the way, this error seems to be occurring in only one of the
> services.
> >>> There is another service which is also using kafka streams to consumer
> >> from
> >>> source, uses processors and then a sink to the output topic, however
> that
> >>> service is running fine. The difference is this other service is using
> >>> at_least_once guarantee while the service in error is exactly once
> >>> guarantee.
> >>> So does this issue relate to transactions which are used only when
> >>> exactly_once guarantee is set?
> >>>
> >>> On Mon, Apr 27, 2020 at 12:37 PM Pushkar Deole 
> >> wrote:
> >>>
> >>>> came across this: seems to be the one
> >>>> https://issues.apache.org/jira/browse/KAFKA-8710
> >>>>
> >>>> On Mon, Apr 27, 2020 at 12:17 PM Pushkar Deole 
> >>>> wrote:
> >>>>
> >>>>> Thanks... can you point to those improvements/bugs that are fixed in
> >> 2.5?
> >>>>>
> >>>>> On Mon, Apr 27, 2020 at 1:03 AM Matthias J. Sax 
> >> wrote:
> >>>>>
> >>>>>> Well, what you say is correct. However, it's a "bug" in the sense
> that
> >>>>>> for some cases the producer does not need to fail, but can
> >> re-initialize
> >>>>>> itself automatically. Of course, you can also see this as an
> >> improvement
> >>>>>> and not a bug :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/25/20 7:48 AM, Pushkar Deole wrote:
> >>>>>>> version used is 2.3
> >>>>>>> however, not sure if this is a bug.. after doing some search, came
> >>>>>> across
> >>>>>>> following for the reason of this:
> >>>>>>>
> >>>>>>> essentially, the transaction coordinator of streams is cleaning up
> >> the
> >>>>>>> producer and transaction ids after a certain time interval
> controller
> >>>>>> by
> >>>>>>> transactional.id.expiration.ms
> >>>>>>> <
> >>>>>>
> >>
> https://docs.confluent.io/current/installation/configuration/broker-configs.html#transactional-id-expiration-ms
> >&

Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-09-09 Thread Pushkar Deole
Hi Matthias,

We are using confluent kafka and upgraded to confluent version 5.5.0 which
I believe maps to 2.5.0 of apache kafka. We tested on few labs by keeping
the solution idle for few days and didn't observe the issue.

However on one of the labs we observed issue again recently, this is the
exception: unfortunately, don't have complete stack trace.
Anyway, do you think it is same exception as above or is it different? and
whether this is also a kafka server issue that is being reported already?

{"@timestamp":"2020-08-12T09:17:35.270+00:00","@version":"1","message":"Unexpected
exception in stream
processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-Str
eamThread-1","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.common.errors.InvalidPidMappingException:
The producer attempted to use a producer id which is not currently assigned
to its transactiona l id.\n"}

On Sat, May 9, 2020 at 12:49 AM Matthias J. Sax  wrote:

> >> So does this issue relate to transactions which are used only when
> >> exactly_once guarantee is set?
>
> Correct.
>
> On 5/8/20 6:28 AM, Pushkar Deole wrote:
> > Hello Matthias,
> >
> > By the way, this error seems to be occurring in only one of the services.
> > There is another service which is also using kafka streams to consumer
> from
> > source, uses processors and then a sink to the output topic, however that
> > service is running fine. The difference is this other service is using
> > at_least_once guarantee while the service in error is exactly once
> > guarantee.
> > So does this issue relate to transactions which are used only when
> > exactly_once guarantee is set?
> >
> > On Mon, Apr 27, 2020 at 12:37 PM Pushkar Deole 
> wrote:
> >
> >> came across this: seems to be the one
> >> https://issues.apache.org/jira/browse/KAFKA-8710
> >>
> >> On Mon, Apr 27, 2020 at 12:17 PM Pushkar Deole 
> >> wrote:
> >>
> >>> Thanks... can you point to those improvements/bugs that are fixed in
> 2.5?
> >>>
> >>> On Mon, Apr 27, 2020 at 1:03 AM Matthias J. Sax 
> wrote:
> >>>
> >>>> Well, what you say is correct. However, it's a "bug" in the sense that
> >>>> for some cases the producer does not need to fail, but can
> re-initialize
> >>>> itself automatically. Of course, you can also see this as an
> improvement
> >>>> and not a bug :)
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/25/20 7:48 AM, Pushkar Deole wrote:
> >>>>> version used is 2.3
> >>>>> however, not sure if this is a bug.. after doing some search, came
> >>>> across
> >>>>> following for the reason of this:
> >>>>>
> >>>>> essentially, the transaction coordinator of streams is cleaning up
> the
> >>>>> producer and transaction ids after a certain time interval controller
> >>>> by
> >>>>> transactional.id.expiration.ms
> >>>>> <
> >>>>
> https://docs.confluent.io/current/installation/configuration/broker-configs.html#transactional-id-expiration-ms
> >>>>> ,
> >>>>> if the coordinator doesn't receive any updates/writes from the
> >>>> producer for
> >>>>> that much time. Default of this parameter is 7 days and our labs have
> >>>> been
> >>>>> idle for more than that.
> >>>>>
> >>>>> On Fri, Apr 24, 2020 at 10:46 PM Matthias J. Sax 
> >>>> wrote:
> >>>>>
> >>>>>> This version are you using?
> >>>>>>
> >>>>>> Couple of broker and client side exactly-once related bugs got fix
> in
> >>>>>> the latest release 2.5.0.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/23/20 11:59 PM, Pushkar Deole wrote:
> >>>>>>> Hello All,
> >>>>>>>
> >>>>>>> While using kafka streams application, we are intermittently
> getting
> >>>>>>> following exception and stream is closed. We need to restart the
> >>>>>>> application to get it working again and start processing. 

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

2020-09-04 Thread Pushkar Deole
Matthias,

Let's say we have independent sub topologies like: in this case, will the
streams create tasks equal to the total number of partitions from topicA
and topicB, and can we assign stream thread count that is sum of the
partition of the two topics?

builder.stream("topicA").filter().to();
builder.stream("topicB").filter().to();

On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax  wrote:

> Well, it depends on your program.
>
> The reason for the current task creating strategy are joins: If you have
> two input topic that you want to join, the join happens on a
> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
> both partitions must be assigned to the same task (to get co-partitioned
> data processed together).
>
> Note, that the following program would create independent tasks as it
> consist of two independent sub-topologies:
>
> builder.stream("topicA").filter().to();
> builder.stream("topicB").filter().to();
>
> However, the next program would be one sub-topology and thus we apply
> the "join" rule (as we don't really know if you actually execute a join
> or not when we create tasks):
>
> KStream s1 = builder.stream("topicA");
> builser.stream("topicB").merge(s1).filter().to();
>
>
> Having said that, I agree that it would be a nice improvement to be more
> clever about it. However, it not easy to do. There is actually a related
> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>
>
> Hope this helps.
>   -Matthias
>
> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> > Hi,
> >
> > I came across articles where it is explained how parallelism is handled
> in
> > kafka streams. This is what I collected:
> > When the streams application is reading from multiple topics, the topic
> > with maximum number of partitions is considered for instantiating stream
> > tasks so 1 task is instantiated per partition.
> > Now, if the stream task is reading from multiple topics then the
> partitions
> > of multiple topics are shared among those stream tasks.
> >
> > For example, Topic A and B has 5 partitions each then 5 tasks are
> > instantiated and assigned to 5 stream threads where each task is
> assigned 1
> > partition from Topic A and Topic B.
> >
> > The question here is : if I would want 1 task to be created for each
> > partition from the input topic then is this possible? e.g. I would want
> to
> > have 5 tasks for topic A and 5 for B and then would want 10 threads to
> > handle those. How can this be achieved?
> >
>
>


Kafka streams parallelism - why not separate stream task per partition per input topic

2020-09-03 Thread Pushkar Deole
Hi,

I came across articles where it is explained how parallelism is handled in
kafka streams. This is what I collected:
When the streams application is reading from multiple topics, the topic
with maximum number of partitions is considered for instantiating stream
tasks so 1 task is instantiated per partition.
Now, if the stream task is reading from multiple topics then the partitions
of multiple topics are shared among those stream tasks.

For example, Topic A and B has 5 partitions each then 5 tasks are
instantiated and assigned to 5 stream threads where each task is assigned 1
partition from Topic A and Topic B.

The question here is : if I would want 1 task to be created for each
partition from the input topic then is this possible? e.g. I would want to
have 5 tasks for topic A and 5 for B and then would want 10 threads to
handle those. How can this be achieved?


Re: kafka per topic metrics

2020-07-16 Thread Pushkar Deole
Thanks Liam... went through the book and got many of doubts cleared..
I think apart from BrokerTopicsMetrics available at broker, there are many
other per topic metrics available at consumer and producers. Are you also
fetching those client app metrics through jmx exporter?

On Thu, Jul 16, 2020 at 11:16 AM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi Pushkar,
>
> Best way to see what metrics are available is to connect to a broker via
> JConsole to see the exposed mbeans.
>
> You can iterate over them programmatically by using the MBean API.
>
> Also recommend chapter 10 of Kafka: The Definitive Guide, it covers the
> metrics really well.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Thu, 16 Jul. 2020, 5:05 pm Pushkar Deole,  wrote:
>
> > Thanks Liam...
> > Few questions: in your pattern the topic parameter is appended pattern:
> > 'kafka.server > topic=(.+)><>OneMinuteRate'
> > however the kafka docs doesn't mention that
> > kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
> >
> >   does the topic parameter available in all BrokerTopicMetrics and can
> the
> > broker provide that parameter dynamically by iterating through all topics
> > or selective topics? Where will the logic reside to iterate through
> > available topics and export metrics for all of them?
> >
> > On Thu, Jul 16, 2020 at 10:04 AM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Whoops, just spotted a typo - the second $1 in the above snippet should
> > of
> > > course be $2.
> > >
> > > On Thu, Jul 16, 2020 at 4:33 PM Liam Clarke-Hutchinson <
> > > liam.cla...@adscale.co.nz> wrote:
> > >
> > > > Hi Pushkar,
> > > >
> > > > There are broker side metrics for messages in / bytes in / bytes out
> > per
> > > > topic per second. I use this jmx_exporter rule to export them:
> > > >   - pattern: 'kafka.server > > > topic=(.+)><>OneMinuteRate'
> > > > name: kafka_broker_per_topic_$1_one_minute_rate
> > > > labels:
> > > >   topic: $1
> > > > type: GAUGE
> > > >
> > > > You can't get the number of messages out per topic from the broker
> > > > because... I think it's somehow related to batching, or similar, it
> > > doesn't
> > > > count messages out, only bytes out. You can, however, get that metric
> > > from
> > > > the consumer if you're using the Java Kafka client, there's a
> per-topic
> > > > messages consumed per second metric exposed as an MBean.
> > > >
> > > > You could either use jmx_exporter to also export that from the client
> > > app,
> > > > or if possible, add some code that connects to the mbean inside the
> JVM
> > > and
> > > > then exports it via any pre-existing Prometheus registry. You might
> > want
> > > to
> > > > then use a Prometheus aggregating rule to collate all the
> per-consumer
> > > apps
> > > > into a per-consumer-group metric, unless the per consumer granularity
> > is
> > > of
> > > > interest to you: https://prometheus.io/docs/practices/rules/
> > > >
> > > > Hope that helps,
> > > >
> > > > Kind regards,
> > > >
> > > > Liam Clarke-Hutchinsons
> > > >
> > > > On Thu, 16 Jul. 2020, 3:46 pm Pushkar Deole, 
> > > wrote:
> > > >
> > > >> Thanks Claudia! For broker level metrics, we are also using same jmx
> > > >> exporter to export those metrics to prometheus.
> > > >> Are you fetching any per topic metrics from broker? e.g. messages
> > > produced
> > > >> on a certain topic or messages consumed from a certain topic. I am
> > > mainly
> > > >> interested in these metrics.
> > > >>
> > > >> I read in kafka docs that they are present at producer/consumer,
> > > however I
> > > >> am not sure how to fetch them from consumer/producer.
> > > >>
> > > >> On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau <
> c.kess...@kasasi.de>
> > > >> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > I use https://github.com/prometheus/jmx_exporter for collecting
> > > broker
> > > >> > metrics and integrating them into prometheus.
> > 

Re: kafka per topic metrics

2020-07-15 Thread Pushkar Deole
Thanks Liam...
Few questions: in your pattern the topic parameter is appended pattern:
'kafka.server<>OneMinuteRate'
however the kafka docs doesn't mention that
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

  does the topic parameter available in all BrokerTopicMetrics and can the
broker provide that parameter dynamically by iterating through all topics
or selective topics? Where will the logic reside to iterate through
available topics and export metrics for all of them?

On Thu, Jul 16, 2020 at 10:04 AM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Whoops, just spotted a typo - the second $1 in the above snippet should of
> course be $2.
>
> On Thu, Jul 16, 2020 at 4:33 PM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Pushkar,
> >
> > There are broker side metrics for messages in / bytes in / bytes out per
> > topic per second. I use this jmx_exporter rule to export them:
> >   - pattern: 'kafka.server > topic=(.+)><>OneMinuteRate'
> > name: kafka_broker_per_topic_$1_one_minute_rate
> > labels:
> >   topic: $1
> > type: GAUGE
> >
> > You can't get the number of messages out per topic from the broker
> > because... I think it's somehow related to batching, or similar, it
> doesn't
> > count messages out, only bytes out. You can, however, get that metric
> from
> > the consumer if you're using the Java Kafka client, there's a per-topic
> > messages consumed per second metric exposed as an MBean.
> >
> > You could either use jmx_exporter to also export that from the client
> app,
> > or if possible, add some code that connects to the mbean inside the JVM
> and
> > then exports it via any pre-existing Prometheus registry. You might want
> to
> > then use a Prometheus aggregating rule to collate all the per-consumer
> apps
> > into a per-consumer-group metric, unless the per consumer granularity is
> of
> > interest to you: https://prometheus.io/docs/practices/rules/
> >
> > Hope that helps,
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinsons
> >
> > On Thu, 16 Jul. 2020, 3:46 pm Pushkar Deole, 
> wrote:
> >
> >> Thanks Claudia! For broker level metrics, we are also using same jmx
> >> exporter to export those metrics to prometheus.
> >> Are you fetching any per topic metrics from broker? e.g. messages
> produced
> >> on a certain topic or messages consumed from a certain topic. I am
> mainly
> >> interested in these metrics.
> >>
> >> I read in kafka docs that they are present at producer/consumer,
> however I
> >> am not sure how to fetch them from consumer/producer.
> >>
> >> On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I use https://github.com/prometheus/jmx_exporter for collecting
> broker
> >> > metrics and integrating them into prometheus.
> >> >
> >> > Hope this helps.
> >> > Greetings,
> >> > Claudia
> >> >
> >> > -Ursprüngliche Nachricht-
> >> > Von: Pushkar Deole 
> >> > Gesendet: Mittwoch, 15. Juli 2020 09:07
> >> > An: users@kafka.apache.org
> >> > Betreff: Re: kafka per topic metrics
> >> >
> >> > We are using prometheus as metrics collection and storage system and
> >> > Grafana for displaying those metrics, so integration with them is
> >> required
> >> >
> >> > On Wed, Jul 15, 2020 at 11:11 AM rohit garg 
> >> > wrote:
> >> >
> >> > > You can try using kafka manager and check it will fullfill most of
> >> > > requirement of yours.
> >> > >
> >> > > Thanks and Regards,
> >> > > Rohit
> >> > >
> >> > > On Wed, Jul 15, 2020, 10:33 Pushkar Deole 
> >> wrote:
> >> > >
> >> > > > Hi All,
> >> > > >
> >> > > > Any inputs as to how the kafka consumer and producer metrics can
> be
> >> > > hooked
> >> > > > up to a monitoring system such as prometheus ?
> >> > > >
> >> > > > On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole <
> pdeole2...@gmail.com
> >> >
> >> > > > wrote:
> >> > > >
> >> > > > > i did find these metrics from confluent docs: however how can i
> >> > > > > get
> >> > > this

Re: kafka per topic metrics

2020-07-15 Thread Pushkar Deole
Thanks Claudia! For broker level metrics, we are also using same jmx
exporter to export those metrics to prometheus.
Are you fetching any per topic metrics from broker? e.g. messages produced
on a certain topic or messages consumed from a certain topic. I am mainly
interested in these metrics.

I read in kafka docs that they are present at producer/consumer, however I
am not sure how to fetch them from consumer/producer.

On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau  wrote:

> Hi,
>
> I use https://github.com/prometheus/jmx_exporter for collecting broker
> metrics and integrating them into prometheus.
>
> Hope this helps.
> Greetings,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Pushkar Deole 
> Gesendet: Mittwoch, 15. Juli 2020 09:07
> An: users@kafka.apache.org
> Betreff: Re: kafka per topic metrics
>
> We are using prometheus as metrics collection and storage system and
> Grafana for displaying those metrics, so integration with them is required
>
> On Wed, Jul 15, 2020 at 11:11 AM rohit garg 
> wrote:
>
> > You can try using kafka manager and check it will fullfill most of
> > requirement of yours.
> >
> > Thanks and Regards,
> > Rohit
> >
> > On Wed, Jul 15, 2020, 10:33 Pushkar Deole  wrote:
> >
> > > Hi All,
> > >
> > > Any inputs as to how the kafka consumer and producer metrics can be
> > hooked
> > > up to a monitoring system such as prometheus ?
> > >
> > > On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole 
> > > wrote:
> > >
> > > > i did find these metrics from confluent docs: however how can i
> > > > get
> > this
> > > > metric ? is it available at kafka broker?
> > > >
> > > > Per-Topic Metrics
> > > >
> > > > MBean:
> > > >
> > >
> > kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([
> > -.w]+)
> > > > record-send-rateThe average number of records sent per second for
> > > > a
> > > topic.
> > > >
> > > > On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole
> > > > 
> > > > wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> Need some help on kafka metrics, i am interested in certain
> > > >> metrics
> > e.g.
> > > >> i need to know the number of records published on a particular
> > > >> topic
> > and
> > > >> number of records consumed from that topic by a specific consumer
> > > group, i
> > > >> would need a total of these 2 and also average per second for them.
> > > >>
> > > >> Are those metrics available on kafka brokers and if yes then
> > > >> which are those metrics that would give me above counts?
> > > >>
> > > >
> > >
> >
>


Re: kafka per topic metrics

2020-07-15 Thread Pushkar Deole
We are using prometheus as metrics collection and storage system and
Grafana for displaying those metrics, so integration with them is required

On Wed, Jul 15, 2020 at 11:11 AM rohit garg  wrote:

> You can try using kafka manager and check it will fullfill most of
> requirement of yours.
>
> Thanks and Regards,
> Rohit
>
> On Wed, Jul 15, 2020, 10:33 Pushkar Deole  wrote:
>
> > Hi All,
> >
> > Any inputs as to how the kafka consumer and producer metrics can be
> hooked
> > up to a monitoring system such as prometheus ?
> >
> > On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole 
> > wrote:
> >
> > > i did find these metrics from confluent docs: however how can i get
> this
> > > metric ? is it available at kafka broker?
> > >
> > > Per-Topic Metrics
> > >
> > > MBean:
> > >
> >
> kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([-.w]+)
> > > record-send-rateThe average number of records sent per second for a
> > topic.
> > >
> > > On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole 
> > > wrote:
> > >
> > >> Hi All,
> > >>
> > >> Need some help on kafka metrics, i am interested in certain metrics
> e.g.
> > >> i need to know the number of records published on a particular topic
> and
> > >> number of records consumed from that topic by a specific consumer
> > group, i
> > >> would need a total of these 2 and also average per second for them.
> > >>
> > >> Are those metrics available on kafka brokers and if yes then which are
> > >> those metrics that would give me above counts?
> > >>
> > >
> >
>


Re: kafka per topic metrics

2020-07-14 Thread Pushkar Deole
Hi All,

Any inputs as to how the kafka consumer and producer metrics can be hooked
up to a monitoring system such as prometheus ?

On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole  wrote:

> i did find these metrics from confluent docs: however how can i get this
> metric ? is it available at kafka broker?
>
> Per-Topic Metrics
>
> MBean:
> kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([-.w]+)
> record-send-rateThe average number of records sent per second for a topic.
>
> On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole 
> wrote:
>
>> Hi All,
>>
>> Need some help on kafka metrics, i am interested in certain metrics e.g.
>> i need to know the number of records published on a particular topic and
>> number of records consumed from that topic by a specific consumer group, i
>> would need a total of these 2 and also average per second for them.
>>
>> Are those metrics available on kafka brokers and if yes then which are
>> those metrics that would give me above counts?
>>
>


Re: kafka per topic metrics

2020-07-14 Thread Pushkar Deole
i did find these metrics from confluent docs: however how can i get this
metric ? is it available at kafka broker?

Per-Topic Metrics

MBean:
kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([-.w]+)
record-send-rateThe average number of records sent per second for a topic.

On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole  wrote:

> Hi All,
>
> Need some help on kafka metrics, i am interested in certain metrics e.g. i
> need to know the number of records published on a particular topic and
> number of records consumed from that topic by a specific consumer group, i
> would need a total of these 2 and also average per second for them.
>
> Are those metrics available on kafka brokers and if yes then which are
> those metrics that would give me above counts?
>


  1   2   >