Hi Raman, Since you are using `transformation` already which is a lower-level API in DSL, you can basically do arbitrary logic like threading pool to process the records within your `process()` or `transform()` function. Note that, like consumer docs mentioned `Typically, you must disable automatic commits and manually commit processed offsets for records... ` the offset committing mechanism is trickier, so you'd probably need to also turn off time-based commit as well and call `context.commit()` yourself when you are certain that all records going through this transform have been completed processing for now.
There's an on-going discussion about adding async processing as an OOTB feature, also as a way to further improve elastic scalability. Feel free to share your thoughts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams Guozhang On Wed, May 22, 2019 at 6:57 AM Raman Gupta <rocketra...@gmail.com> wrote: > I have a situation in which I have a stream that does a > transformation. This transformation can take as little as 10-30s or up > to about 15 minutes to run. The stream works just fine, until a > rebalance happens in the middle of long processing. Rebalancing > sometimes takes a long time, and sometimes, a new rebalance is started > by Kafka soon after the previous one completes, and this pattern > usually continues for some time. > > Reading the docs for Kafka consumer, I see this gem: > > > For use cases where message processing time varies unpredictably, > neither of these options may be sufficient. The recommended way to handle > these cases is to move message processing to another thread, which allows > the consumer to continue calling poll while the processor is still working. > Some care must be taken to ensure that committed offsets do not get ahead > of the actual position. Typically, you must disable automatic commits and > manually commit processed offsets for records only after the thread has > finished handling them (depending on the delivery semantics you need). Note > also that you will need to pause the partition so that no new records are > received from poll until after thread has finished handling those > previously returned. > > Ok awesome, that sounds easy enough and seems to be exactly what I > need. Doing the processing in a separate thread would make the > rebalance a snap. > > However, is there a way to do this, or something similar, with > streams? I would like to use the streams abstraction rather than the > consumer/producer APIs directly. > > Regards, > Raman > -- -- Guozhang