Hello Greg,

You can pass underlying client configs for consumer and producer as well
into StreamsConfig, e.g.:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


As for your case, since ConsumerConfig.AUTO_OFFSET_REST_CONFIG takes
universal effects on all topics that do not have committed offsets, and
hence it is not sufficient.

Instead, you can set the AUTO_OFFSET_REST_CONFIG to "latest" and manually
commit the offset as log start offset for your table stream by using a
consumer (note that you need to set the consumer's group-id to be the same
as your streams application-id):

-----------

consumer.subscribe("the-topic-you-are-piping-your-table-to");

for (partition <- consumer.assigned())
    consumer.commit(partition, 0);   // log start offset

-----------

Guozhang


On Fri, Mar 25, 2016 at 2:14 PM, Greg Fodor <gfo...@gmail.com> wrote:

> Hey Jay, thanks -- the timestamp alignment makes sense and clears up
> why prioritization isn't needed, thanks. It sounds like the one thing
> I still don't understand is how we can tell Kafka Streams to start
> reading from the latest or earliest offset for a given source.
>
> I might be thinking about this the wrong way, but what I'm hoping to
> be able to do easily is materialize a subset of our database tables
> that are being streamed to Kafka via Kafka Connect for joining, etc,
> in the streaming job. Meanwhile, we have other (higher volume) topics
> that are streams where we want to begin processing at the latest
> message after kicking off the job, the historical data in the topic is
> irrelevant. So for the table, I'd want to consume from the beginning
> (and rely upon the timestamp alignment to effectively block the other
> streams until it's caught up) whereas for the other streams I want to
> start consuming at the latest offset. What's the best approach for
> solving this?
>
> On Fri, Mar 25, 2016 at 12:05 PM, Jay Kreps <j...@confluent.io> wrote:
> > Hey Greg,
> >
> > I think the use case you have is that you have some derived state and you
> > want to make sure it is fully populated before any of the input streams
> are
> > populated. There would be two ways to accomplish this: (1) have a
> changelog
> > that captures the state, (2) reprocess the input to recreate the derived
> > state. I think Kafka Streams does (1) automatically, but (2) can
> > potentially be more efficient in certain cases and you want to find a way
> > to take advantage of that optimization if I understand correctly.
> >
> > What would be required to make (2) work?
> >
> > You describe the need as fully processing some of the inputs, but I
> > actually think the default behavior in Kafka Streams may already handle
> > this better--it will try to align the inputs by timestamp. This should
> > actually be better in terms of keeping the state and the events at
> similar
> > points in time which is I think what you really want.
> >
> > However to accomplish what you want I think you need to be able to not
> > commit offsets against the input used to populate the state store so that
> > it will always get reprocessed and you need to be able to disable the
> > changelog.
> >
> > -Jay
> >
> > On Fri, Mar 25, 2016 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
> >
> >> That's great. Is there anything I can do to force that particular topic
> to
> >> start at the earliest message every time the job is started?
> >> On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" <
> yasuhiro.mats...@gmail.com>
> >> wrote:
> >>
> >> > It may not be ideal, but there is a way to prioritize particular
> topics.
> >> It
> >> > is to set the record timestamps to zero. This can be done by using a
> >> custom
> >> > TimestampExtractor. Kafka Streams tries to synchronize multiple
> streams
> >> > using the extracted timestamps. So, records with the timestamp 0 have
> >> > greater chance to be processed earlier than others.
> >> >
> >> > On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor <gfo...@gmail.com> wrote:
> >> >
> >> > > Really digging Kafka Streams so far, nice work all. I'm interested
> in
> >> > > being able to materialize one or more KTables in full before the
> rest
> >> > > of the topology begins processing messages. This seems fundamentally
> >> > > useful since it allows you to get your database tables replicated up
> >> > > off the change stream topics from Connect before the stream
> processing
> >> > > workload starts.
> >> > >
> >> > > In Samza we have bootstrap streams and stream prioritization to help
> >> > > facilitate this. What seems desirable for Kafka Streams is:
> >> > >
> >> > > - Per-source prioritization (by defaulting to >0, setting the stream
> >> > > priority to 0 effectively bootstraps it.)
> >> > > - Per-source initial offset settings (earliest or latest, default to
> >> > > latest)
> >> > >
> >> > > To solve the KTable materialization problem, you'd set priority to 0
> >> > > for its source and the source offset setting to earliest.
> >> > >
> >> > > Right now it appears the only control you have for re-processing is
> >> > > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
> >> > > the consumers, and hence, the entire job. Beyond that, I don't see
> any
> >> > > way to prioritize stream consumption at all, so your KTables will be
> >> > > getting materialized while the general stream processing work is
> >> > > running concurrently.
> >> > >
> >> > > I wanted to see if this case is actually supported already and I'm
> >> > > missing something, or if not, if these options make sense. If this
> >> > > seems reasonable and it's not too complicated, I could possibly try
> to
> >> > > get together a patch. If so, any tips on implementing this would be
> >> > > helpful as well. Thanks!
> >> > >
> >> > > -Greg
> >> > >
> >> >
> >>
>



-- 
-- Guozhang

Reply via email to