Hi Raman,

Since you are using `transformation` already which is a lower-level API in
DSL, you can basically do arbitrary logic like threading pool to process
the records within your `process()` or `transform()` function. Note that,
like consumer docs mentioned `Typically, you must disable automatic commits
and manually commit processed offsets for records... ` the offset
committing mechanism is trickier, so you'd probably need to also turn off
time-based commit as well and call `context.commit()` yourself when you are
certain that all records going through this transform have been completed
processing for now.

There's an on-going discussion about adding async processing as an OOTB
feature, also as a way to further improve elastic scalability. Feel free to
share your thoughts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams

Guozhang


On Wed, May 22, 2019 at 6:57 AM Raman Gupta <rocketra...@gmail.com> wrote:

> I have a situation in which I have a stream that does a
> transformation. This transformation can take as little as 10-30s or up
> to about 15 minutes to run. The stream works just fine, until a
> rebalance happens in the middle of long processing. Rebalancing
> sometimes takes a long time, and sometimes, a new rebalance is started
> by Kafka soon after the previous one completes, and this pattern
> usually continues for some time.
>
> Reading the docs for Kafka consumer, I see this gem:
>
> > For use cases where message processing time varies unpredictably,
> neither of these options may be sufficient. The recommended way to handle
> these cases is to move message processing to another thread, which allows
> the consumer to continue calling poll while the processor is still working.
> Some care must be taken to ensure that committed offsets do not get ahead
> of the actual position. Typically, you must disable automatic commits and
> manually commit processed offsets for records only after the thread has
> finished handling them (depending on the delivery semantics you need). Note
> also that you will need to pause the partition so that no new records are
> received from poll until after thread has finished handling those
> previously returned.
>
> Ok awesome, that sounds easy enough and seems to be exactly what I
> need. Doing the processing in a separate thread would make the
> rebalance a snap.
>
> However, is there a way to do this, or something similar, with
> streams? I would like to use the streams abstraction rather than the
> consumer/producer APIs directly.
>
> Regards,
> Raman
>


-- 
-- Guozhang

Reply via email to