I see. I you do the step of storing the end offsets in your database before starting up Streams this would work.
What you could do as a work around (even if it might not be a nice solution), is to apply a `transform()` as your first operator. Within `transfrom()` you get access to there current record offset via `context.offset` (context object is provided via `init()`). Thus, you can implement an "offset filter" and also track if all partitions did reach their end offset (you also get a records partitions via context). Thus, if one record is after the partition end-offset, you just filter the record out. If all partitions did reach end-offset, you can set a flag to notify you "main" thread to close() Kafka Streams instances. Does this make sense? -Matthias On 5/8/17 12:49 PM, Timur Yusupov wrote: > Matthias, > > Thanks for your answers. > >>> So we are considering to just pause specific >>> topic partitions as soon as we arrive to stop offsets for them. >> I am just wondering how you would do this in a fault-tolerant way (if you > would have pause API)? > On start of batch cycle we have to store somewhere (for our use case > database we already use will work) end offsets for topic partitions we are > interested in. Then we just need to process all messages up to stored end > offsets. In case application is restarted - it first checks database for > stored end offsets. > >>> 2) Assume we process multiple topics in some parallel way and want to > pause >>> some topics while waiting for other topics to catch up. >> Streams synchronizes topics on time automatically for your. So I am > wondering why this does not work for you? > Right, this is probably a bad example, but use case 1) with batch > processing is still relevant. > > >> >> -Matthias >> >> >> On 4/27/17 8:52 AM, Timur Yusupov wrote: >>> I see it is possible to pause specific topic partition consumption when >>> using KafkaConsumer directly, but looks like it is not possible when >> using >>> KafkaStreams. >>> >>> There are following use cases for that: >>> 1) Doing batch processing using Kafka Streams (I found >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams >>> proposal for Kafka Streams, but according to >>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active >>> development on that side). So we are considering to just pause specific >>> topic partitions as soon as we arrive to stop offsets for them. >>> >>> 2) Assume we process multiple topics in some parallel way and want to >> pause >>> some topics while waiting for other topics to catch up. >>> >>> Actually, the first use case is more important for us, so would be good >> to >>> know if there is a possibility or some improvements are already planned >> for >>> allowing to pause specific topic partition consumption in KafkaStream. >>> >> >> > >
signature.asc
Description: OpenPGP digital signature