Hello Greg, You can pass underlying client configs for consumer and producer as well into StreamsConfig, e.g.:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); As for your case, since ConsumerConfig.AUTO_OFFSET_REST_CONFIG takes universal effects on all topics that do not have committed offsets, and hence it is not sufficient. Instead, you can set the AUTO_OFFSET_REST_CONFIG to "latest" and manually commit the offset as log start offset for your table stream by using a consumer (note that you need to set the consumer's group-id to be the same as your streams application-id): ----------- consumer.subscribe("the-topic-you-are-piping-your-table-to"); for (partition <- consumer.assigned()) consumer.commit(partition, 0); // log start offset ----------- Guozhang On Fri, Mar 25, 2016 at 2:14 PM, Greg Fodor <gfo...@gmail.com> wrote: > Hey Jay, thanks -- the timestamp alignment makes sense and clears up > why prioritization isn't needed, thanks. It sounds like the one thing > I still don't understand is how we can tell Kafka Streams to start > reading from the latest or earliest offset for a given source. > > I might be thinking about this the wrong way, but what I'm hoping to > be able to do easily is materialize a subset of our database tables > that are being streamed to Kafka via Kafka Connect for joining, etc, > in the streaming job. Meanwhile, we have other (higher volume) topics > that are streams where we want to begin processing at the latest > message after kicking off the job, the historical data in the topic is > irrelevant. So for the table, I'd want to consume from the beginning > (and rely upon the timestamp alignment to effectively block the other > streams until it's caught up) whereas for the other streams I want to > start consuming at the latest offset. What's the best approach for > solving this? > > On Fri, Mar 25, 2016 at 12:05 PM, Jay Kreps <j...@confluent.io> wrote: > > Hey Greg, > > > > I think the use case you have is that you have some derived state and you > > want to make sure it is fully populated before any of the input streams > are > > populated. There would be two ways to accomplish this: (1) have a > changelog > > that captures the state, (2) reprocess the input to recreate the derived > > state. I think Kafka Streams does (1) automatically, but (2) can > > potentially be more efficient in certain cases and you want to find a way > > to take advantage of that optimization if I understand correctly. > > > > What would be required to make (2) work? > > > > You describe the need as fully processing some of the inputs, but I > > actually think the default behavior in Kafka Streams may already handle > > this better--it will try to align the inputs by timestamp. This should > > actually be better in terms of keeping the state and the events at > similar > > points in time which is I think what you really want. > > > > However to accomplish what you want I think you need to be able to not > > commit offsets against the input used to populate the state store so that > > it will always get reprocessed and you need to be able to disable the > > changelog. > > > > -Jay > > > > On Fri, Mar 25, 2016 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote: > > > >> That's great. Is there anything I can do to force that particular topic > to > >> start at the earliest message every time the job is started? > >> On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" < > yasuhiro.mats...@gmail.com> > >> wrote: > >> > >> > It may not be ideal, but there is a way to prioritize particular > topics. > >> It > >> > is to set the record timestamps to zero. This can be done by using a > >> custom > >> > TimestampExtractor. Kafka Streams tries to synchronize multiple > streams > >> > using the extracted timestamps. So, records with the timestamp 0 have > >> > greater chance to be processed earlier than others. > >> > > >> > On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor <gfo...@gmail.com> wrote: > >> > > >> > > Really digging Kafka Streams so far, nice work all. I'm interested > in > >> > > being able to materialize one or more KTables in full before the > rest > >> > > of the topology begins processing messages. This seems fundamentally > >> > > useful since it allows you to get your database tables replicated up > >> > > off the change stream topics from Connect before the stream > processing > >> > > workload starts. > >> > > > >> > > In Samza we have bootstrap streams and stream prioritization to help > >> > > facilitate this. What seems desirable for Kafka Streams is: > >> > > > >> > > - Per-source prioritization (by defaulting to >0, setting the stream > >> > > priority to 0 effectively bootstraps it.) > >> > > - Per-source initial offset settings (earliest or latest, default to > >> > > latest) > >> > > > >> > > To solve the KTable materialization problem, you'd set priority to 0 > >> > > for its source and the source offset setting to earliest. > >> > > > >> > > Right now it appears the only control you have for re-processing is > >> > > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for > >> > > the consumers, and hence, the entire job. Beyond that, I don't see > any > >> > > way to prioritize stream consumption at all, so your KTables will be > >> > > getting materialized while the general stream processing work is > >> > > running concurrently. > >> > > > >> > > I wanted to see if this case is actually supported already and I'm > >> > > missing something, or if not, if these options make sense. If this > >> > > seems reasonable and it's not too complicated, I could possibly try > to > >> > > get together a patch. If so, any tips on implementing this would be > >> > > helpful as well. Thanks! > >> > > > >> > > -Greg > >> > > > >> > > >> > -- -- Guozhang