Pushkar,

I don't know if this meets your needs, but I recently implemented something
similar in Samza (which I would classify as a hack, but it works); my
solution included:

- check for the pause condition on each message
- if the condition is met, then go into a while-true-sleep loop
- alternate the "sleep" command with one that polls for relevant messages
that might unpause the queue (in my case it's re-ingesting the broadcast
stream for updated configuration options, you might just read the message
at a certain offset to see if it is ready to be processed)

Now, this _will_ pause that entire partition while waiting for the process
status to evaluate to true, so bear that in mind. In our case that's
exactly what we want, but you may not want to actually halt processing,
just postpone processing of this specific message.

Cheers,
Malcolm McFarland
Cavulus


This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of the
contents of this message is prohibited. The information contained in this
message is intended only for the personal and confidential use of the
recipient(s) named above. If you have received this message in error,
please notify the sender immediately and delete the original message.


On Mon, Sep 21, 2020 at 8:31 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Pushkar,
>
> If you want to keep the order, you could still use the state store I
> suggested in my previous e-mail and implement a queue on top of it. For
> that you need to put the events into the store with a key that
> represents the arrival order of the events. Each time a record is
> received from the input topic, the events are read in arrival order from
> the state store and the data in the global table is probed. If an event
> matches data from the global table the event is removed from the state
> store and emitted. If an event does not match data from the global table
> the processing is stopped and nothing is emitted.
>
> Best,
> Bruno
>
> On 21.09.20 14:21, Pushkar Deole wrote:
> > Bruno,
> >
> > 1. the loading of topic mapped to GlobalKTable is done by some other
> > service/application so when my application starts up, it will just sync a
> > GlobalKTable against that topic and if that other service/application is
> > still starting up then it may not have loaded that data on that topic and
> > that's the reason it is not available to my application through the
> > GlobalKTable.
> >
> > 2. I don't want out of order processing to happen, that's the reason I
> want
> > my streams application to keep trying same event until the other
> > application starts up and required data becomes available in globalKtable
> >
> >
> > On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Thank you for clarifying! Now, I think I understand.
> >>
> >> You could put events for which required data in the global table is not
> >> available into a state store and each time an event from the input topic
> >> is processed, you could lookup all events in your state store and see if
> >> required data is now available for them.
> >>
> >> However, be aware that this can mix up the original order of the events
> >> in your input topic if required data of later events is available before
> >> required data of earlier events. Furthermore, you need to consider the
> >> case when you have a huge amount of events in the state store and
> >> suddenly all required data in the global table is available, because
> >> processing all those events at once might lead to exceeding
> >> max.poll.interval.ms and the stream thread might be kicked out of the
> >> consumer group. To solve that, you may want to limit the number of
> >> events processed at once. You also need to avoid the state store growing
> >> indefinitely if required data in the global table is not available for a
> >> long time or not available at all. Maybe all this caveats do not apply
> >> to your use case.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On 21.09.20 13:45, Pushkar Deole wrote:
> >>> Say the application level exception is named as :
> >>> MeasureDefinitionNotAvaialbleException
> >>>
> >>> What I am trying to achieve is: in above case when the event processing
> >>> fails due to required data not available, the streams should not
> proceed
> >> on
> >>> to next event, however it should wait for the processing of current
> event
> >>> to complete. If I just catch the MeasureDefinitionNotAvaialbleException
> >> in
> >>> processor and log it then the stream will proceed onto next event
> >>> considering the current event processing got successful right?
> >>>
> >>> On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole <pdeole2...@gmail.com>
> >> wrote:
> >>>
> >>>> It is not a kafka streams error, it is an application level error e.g.
> >>>> say, some data required for processing an input event is not available
> >> in
> >>>> the GlobalKTable since it is not yet synced with the global topic
> >>>>
> >>>> On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna <br...@confluent.io>
> >> wrote:
> >>>>
> >>>>> Hi Pushkar,
> >>>>>
> >>>>> Is the error you are talking about, one that is thrown by Kafka
> Streams
> >>>>> or by your application? If it is thrown by Kafka Streams, could you
> >>>>> please post the error?
> >>>>>
> >>>>> I do not completely understand what you are trying to achieve, but
> >> maybe
> >>>>> max.task.idle.ms [1] is the configuration you are looking for.
> >>>>>
> >>>>> I can assure you that enable.auto.commit is false in Kafka Streams.
> >> What
> >>>>> you probably mean is that Kafka Streams periodically commits the
> >>>>> offsets. The commit interval can be controlled with
> commit.interval.ms
> >>>>> [2].
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>>
> >>>>> [1] https://kafka.apache.org/documentation/#max.task.idle.ms
> >>>>> [2] https://kafka.apache.org/documentation/#commit.interval.ms
> >>>>>
> >>>>> On 21.09.20 12:38, Pushkar Deole wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> I would like to know how to handle following scenarios while
> >> processing
> >>>>>> events in a kafka streams application:
> >>>>>>
> >>>>>> 1. the streams application needs data from a globalKtable which
> loads
> >> it
> >>>>>> from a topic that is populated by some other service/application.
> So,
> >> if
> >>>>>> the streams application starts getting events from input source
> topic
> >>>>>> however it doesn't find required data in GlobalKTable since that
> other
> >>>>>> application/service hasn't yet loaded that data then the Kafka
> streams
> >>>>>> application gets error while processing the event and application
> >>>>> handles
> >>>>>> the exception by logging  an error and it goes onto processing other
> >>>>>> events. Since auto.commit is true, the polling will go on fetching
> >> next
> >>>>>> batch and probably it will set the offset of previous batch, causing
> >>>>> loss
> >>>>>> of events that had an exception while processing.
> >>>>>>
> >>>>>> I want to halt the processing here if an error occurs while
> processing
> >>>>> the
> >>>>>> event, so instead of going on to the next event, the processing
> should
> >>>>> keep
> >>>>>> trying previous event until application level error is resolved. How
> >>>>> can I
> >>>>>> achieve this?
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to