Re: KStream/KTable prioritization/initial offset

2016-03-25 Thread Guozhang Wang
Hello Greg,

You can pass underlying client configs for consumer and producer as well
into StreamsConfig, e.g.:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


As for your case, since ConsumerConfig.AUTO_OFFSET_REST_CONFIG takes
universal effects on all topics that do not have committed offsets, and
hence it is not sufficient.

Instead, you can set the AUTO_OFFSET_REST_CONFIG to "latest" and manually
commit the offset as log start offset for your table stream by using a
consumer (note that you need to set the consumer's group-id to be the same
as your streams application-id):

---

consumer.subscribe("the-topic-you-are-piping-your-table-to");

for (partition <- consumer.assigned())
consumer.commit(partition, 0);   // log start offset

---

Guozhang


On Fri, Mar 25, 2016 at 2:14 PM, Greg Fodor  wrote:

> Hey Jay, thanks -- the timestamp alignment makes sense and clears up
> why prioritization isn't needed, thanks. It sounds like the one thing
> I still don't understand is how we can tell Kafka Streams to start
> reading from the latest or earliest offset for a given source.
>
> I might be thinking about this the wrong way, but what I'm hoping to
> be able to do easily is materialize a subset of our database tables
> that are being streamed to Kafka via Kafka Connect for joining, etc,
> in the streaming job. Meanwhile, we have other (higher volume) topics
> that are streams where we want to begin processing at the latest
> message after kicking off the job, the historical data in the topic is
> irrelevant. So for the table, I'd want to consume from the beginning
> (and rely upon the timestamp alignment to effectively block the other
> streams until it's caught up) whereas for the other streams I want to
> start consuming at the latest offset. What's the best approach for
> solving this?
>
> On Fri, Mar 25, 2016 at 12:05 PM, Jay Kreps  wrote:
> > Hey Greg,
> >
> > I think the use case you have is that you have some derived state and you
> > want to make sure it is fully populated before any of the input streams
> are
> > populated. There would be two ways to accomplish this: (1) have a
> changelog
> > that captures the state, (2) reprocess the input to recreate the derived
> > state. I think Kafka Streams does (1) automatically, but (2) can
> > potentially be more efficient in certain cases and you want to find a way
> > to take advantage of that optimization if I understand correctly.
> >
> > What would be required to make (2) work?
> >
> > You describe the need as fully processing some of the inputs, but I
> > actually think the default behavior in Kafka Streams may already handle
> > this better--it will try to align the inputs by timestamp. This should
> > actually be better in terms of keeping the state and the events at
> similar
> > points in time which is I think what you really want.
> >
> > However to accomplish what you want I think you need to be able to not
> > commit offsets against the input used to populate the state store so that
> > it will always get reprocessed and you need to be able to disable the
> > changelog.
> >
> > -Jay
> >
> > On Fri, Mar 25, 2016 at 11:55 AM, Greg Fodor  wrote:
> >
> >> That's great. Is there anything I can do to force that particular topic
> to
> >> start at the earliest message every time the job is started?
> >> On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" <
> yasuhiro.mats...@gmail.com>
> >> wrote:
> >>
> >> > It may not be ideal, but there is a way to prioritize particular
> topics.
> >> It
> >> > is to set the record timestamps to zero. This can be done by using a
> >> custom
> >> > TimestampExtractor. Kafka Streams tries to synchronize multiple
> streams
> >> > using the extracted timestamps. So, records with the timestamp 0 have
> >> > greater chance to be processed earlier than others.
> >> >
> >> > On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor  wrote:
> >> >
> >> > > Really digging Kafka Streams so far, nice work all. I'm interested
> in
> >> > > being able to materialize one or more KTables in full before the
> rest
> >> > > of the topology begins processing messages. This seems fundamentally
> >> > > useful since it allows you to get your database tables replicated up
> >> > > off the change stream topics from Connect before the stream
> processing
> >> > > workload starts.
> >> > >
> >> > > In Samza we have bootstrap streams and stream prioritization to help
> >> > > facilitate this. What seems desirable for Kafka Streams is:
> >> > >
> >> > > - Per-source prioritization (by defaulting to >0, setting the stream
> >> > > priority to 0 effectively bootstraps it.)
> >> > > - Per-source initial offset settings (earliest or latest, default to
> >> > > latest)
> >> > >
> >> > > To solve the KTable materialization problem, you'd set priority to 0
> >> > > for its source and the source offset setting to earliest.
> >> > >
> >> > > Right now it 

Re: KStream/KTable prioritization/initial offset

2016-03-25 Thread Greg Fodor
Hey Jay, thanks -- the timestamp alignment makes sense and clears up
why prioritization isn't needed, thanks. It sounds like the one thing
I still don't understand is how we can tell Kafka Streams to start
reading from the latest or earliest offset for a given source.

I might be thinking about this the wrong way, but what I'm hoping to
be able to do easily is materialize a subset of our database tables
that are being streamed to Kafka via Kafka Connect for joining, etc,
in the streaming job. Meanwhile, we have other (higher volume) topics
that are streams where we want to begin processing at the latest
message after kicking off the job, the historical data in the topic is
irrelevant. So for the table, I'd want to consume from the beginning
(and rely upon the timestamp alignment to effectively block the other
streams until it's caught up) whereas for the other streams I want to
start consuming at the latest offset. What's the best approach for
solving this?

On Fri, Mar 25, 2016 at 12:05 PM, Jay Kreps  wrote:
> Hey Greg,
>
> I think the use case you have is that you have some derived state and you
> want to make sure it is fully populated before any of the input streams are
> populated. There would be two ways to accomplish this: (1) have a changelog
> that captures the state, (2) reprocess the input to recreate the derived
> state. I think Kafka Streams does (1) automatically, but (2) can
> potentially be more efficient in certain cases and you want to find a way
> to take advantage of that optimization if I understand correctly.
>
> What would be required to make (2) work?
>
> You describe the need as fully processing some of the inputs, but I
> actually think the default behavior in Kafka Streams may already handle
> this better--it will try to align the inputs by timestamp. This should
> actually be better in terms of keeping the state and the events at similar
> points in time which is I think what you really want.
>
> However to accomplish what you want I think you need to be able to not
> commit offsets against the input used to populate the state store so that
> it will always get reprocessed and you need to be able to disable the
> changelog.
>
> -Jay
>
> On Fri, Mar 25, 2016 at 11:55 AM, Greg Fodor  wrote:
>
>> That's great. Is there anything I can do to force that particular topic to
>> start at the earliest message every time the job is started?
>> On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" 
>> wrote:
>>
>> > It may not be ideal, but there is a way to prioritize particular topics.
>> It
>> > is to set the record timestamps to zero. This can be done by using a
>> custom
>> > TimestampExtractor. Kafka Streams tries to synchronize multiple streams
>> > using the extracted timestamps. So, records with the timestamp 0 have
>> > greater chance to be processed earlier than others.
>> >
>> > On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor  wrote:
>> >
>> > > Really digging Kafka Streams so far, nice work all. I'm interested in
>> > > being able to materialize one or more KTables in full before the rest
>> > > of the topology begins processing messages. This seems fundamentally
>> > > useful since it allows you to get your database tables replicated up
>> > > off the change stream topics from Connect before the stream processing
>> > > workload starts.
>> > >
>> > > In Samza we have bootstrap streams and stream prioritization to help
>> > > facilitate this. What seems desirable for Kafka Streams is:
>> > >
>> > > - Per-source prioritization (by defaulting to >0, setting the stream
>> > > priority to 0 effectively bootstraps it.)
>> > > - Per-source initial offset settings (earliest or latest, default to
>> > > latest)
>> > >
>> > > To solve the KTable materialization problem, you'd set priority to 0
>> > > for its source and the source offset setting to earliest.
>> > >
>> > > Right now it appears the only control you have for re-processing is
>> > > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
>> > > the consumers, and hence, the entire job. Beyond that, I don't see any
>> > > way to prioritize stream consumption at all, so your KTables will be
>> > > getting materialized while the general stream processing work is
>> > > running concurrently.
>> > >
>> > > I wanted to see if this case is actually supported already and I'm
>> > > missing something, or if not, if these options make sense. If this
>> > > seems reasonable and it's not too complicated, I could possibly try to
>> > > get together a patch. If so, any tips on implementing this would be
>> > > helpful as well. Thanks!
>> > >
>> > > -Greg
>> > >
>> >
>>


Re: KStream/KTable prioritization/initial offset

2016-03-25 Thread Jay Kreps
Hey Greg,

I think the use case you have is that you have some derived state and you
want to make sure it is fully populated before any of the input streams are
populated. There would be two ways to accomplish this: (1) have a changelog
that captures the state, (2) reprocess the input to recreate the derived
state. I think Kafka Streams does (1) automatically, but (2) can
potentially be more efficient in certain cases and you want to find a way
to take advantage of that optimization if I understand correctly.

What would be required to make (2) work?

You describe the need as fully processing some of the inputs, but I
actually think the default behavior in Kafka Streams may already handle
this better--it will try to align the inputs by timestamp. This should
actually be better in terms of keeping the state and the events at similar
points in time which is I think what you really want.

However to accomplish what you want I think you need to be able to not
commit offsets against the input used to populate the state store so that
it will always get reprocessed and you need to be able to disable the
changelog.

-Jay

On Fri, Mar 25, 2016 at 11:55 AM, Greg Fodor  wrote:

> That's great. Is there anything I can do to force that particular topic to
> start at the earliest message every time the job is started?
> On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" 
> wrote:
>
> > It may not be ideal, but there is a way to prioritize particular topics.
> It
> > is to set the record timestamps to zero. This can be done by using a
> custom
> > TimestampExtractor. Kafka Streams tries to synchronize multiple streams
> > using the extracted timestamps. So, records with the timestamp 0 have
> > greater chance to be processed earlier than others.
> >
> > On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor  wrote:
> >
> > > Really digging Kafka Streams so far, nice work all. I'm interested in
> > > being able to materialize one or more KTables in full before the rest
> > > of the topology begins processing messages. This seems fundamentally
> > > useful since it allows you to get your database tables replicated up
> > > off the change stream topics from Connect before the stream processing
> > > workload starts.
> > >
> > > In Samza we have bootstrap streams and stream prioritization to help
> > > facilitate this. What seems desirable for Kafka Streams is:
> > >
> > > - Per-source prioritization (by defaulting to >0, setting the stream
> > > priority to 0 effectively bootstraps it.)
> > > - Per-source initial offset settings (earliest or latest, default to
> > > latest)
> > >
> > > To solve the KTable materialization problem, you'd set priority to 0
> > > for its source and the source offset setting to earliest.
> > >
> > > Right now it appears the only control you have for re-processing is
> > > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
> > > the consumers, and hence, the entire job. Beyond that, I don't see any
> > > way to prioritize stream consumption at all, so your KTables will be
> > > getting materialized while the general stream processing work is
> > > running concurrently.
> > >
> > > I wanted to see if this case is actually supported already and I'm
> > > missing something, or if not, if these options make sense. If this
> > > seems reasonable and it's not too complicated, I could possibly try to
> > > get together a patch. If so, any tips on implementing this would be
> > > helpful as well. Thanks!
> > >
> > > -Greg
> > >
> >
>


Re: KStream/KTable prioritization/initial offset

2016-03-25 Thread Greg Fodor
That's great. Is there anything I can do to force that particular topic to
start at the earliest message every time the job is started?
On Mar 25, 2016 10:20 AM, "Yasuhiro Matsuda" 
wrote:

> It may not be ideal, but there is a way to prioritize particular topics. It
> is to set the record timestamps to zero. This can be done by using a custom
> TimestampExtractor. Kafka Streams tries to synchronize multiple streams
> using the extracted timestamps. So, records with the timestamp 0 have
> greater chance to be processed earlier than others.
>
> On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor  wrote:
>
> > Really digging Kafka Streams so far, nice work all. I'm interested in
> > being able to materialize one or more KTables in full before the rest
> > of the topology begins processing messages. This seems fundamentally
> > useful since it allows you to get your database tables replicated up
> > off the change stream topics from Connect before the stream processing
> > workload starts.
> >
> > In Samza we have bootstrap streams and stream prioritization to help
> > facilitate this. What seems desirable for Kafka Streams is:
> >
> > - Per-source prioritization (by defaulting to >0, setting the stream
> > priority to 0 effectively bootstraps it.)
> > - Per-source initial offset settings (earliest or latest, default to
> > latest)
> >
> > To solve the KTable materialization problem, you'd set priority to 0
> > for its source and the source offset setting to earliest.
> >
> > Right now it appears the only control you have for re-processing is
> > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
> > the consumers, and hence, the entire job. Beyond that, I don't see any
> > way to prioritize stream consumption at all, so your KTables will be
> > getting materialized while the general stream processing work is
> > running concurrently.
> >
> > I wanted to see if this case is actually supported already and I'm
> > missing something, or if not, if these options make sense. If this
> > seems reasonable and it's not too complicated, I could possibly try to
> > get together a patch. If so, any tips on implementing this would be
> > helpful as well. Thanks!
> >
> > -Greg
> >
>


Re: KStream/KTable prioritization/initial offset

2016-03-25 Thread Yasuhiro Matsuda
It may not be ideal, but there is a way to prioritize particular topics. It
is to set the record timestamps to zero. This can be done by using a custom
TimestampExtractor. Kafka Streams tries to synchronize multiple streams
using the extracted timestamps. So, records with the timestamp 0 have
greater chance to be processed earlier than others.

On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor  wrote:

> Really digging Kafka Streams so far, nice work all. I'm interested in
> being able to materialize one or more KTables in full before the rest
> of the topology begins processing messages. This seems fundamentally
> useful since it allows you to get your database tables replicated up
> off the change stream topics from Connect before the stream processing
> workload starts.
>
> In Samza we have bootstrap streams and stream prioritization to help
> facilitate this. What seems desirable for Kafka Streams is:
>
> - Per-source prioritization (by defaulting to >0, setting the stream
> priority to 0 effectively bootstraps it.)
> - Per-source initial offset settings (earliest or latest, default to
> latest)
>
> To solve the KTable materialization problem, you'd set priority to 0
> for its source and the source offset setting to earliest.
>
> Right now it appears the only control you have for re-processing is
> AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
> the consumers, and hence, the entire job. Beyond that, I don't see any
> way to prioritize stream consumption at all, so your KTables will be
> getting materialized while the general stream processing work is
> running concurrently.
>
> I wanted to see if this case is actually supported already and I'm
> missing something, or if not, if these options make sense. If this
> seems reasonable and it's not too complicated, I could possibly try to
> get together a patch. If so, any tips on implementing this would be
> helpful as well. Thanks!
>
> -Greg
>


KStream/KTable prioritization/initial offset

2016-03-24 Thread Greg Fodor
Really digging Kafka Streams so far, nice work all. I'm interested in
being able to materialize one or more KTables in full before the rest
of the topology begins processing messages. This seems fundamentally
useful since it allows you to get your database tables replicated up
off the change stream topics from Connect before the stream processing
workload starts.

In Samza we have bootstrap streams and stream prioritization to help
facilitate this. What seems desirable for Kafka Streams is:

- Per-source prioritization (by defaulting to >0, setting the stream
priority to 0 effectively bootstraps it.)
- Per-source initial offset settings (earliest or latest, default to latest)

To solve the KTable materialization problem, you'd set priority to 0
for its source and the source offset setting to earliest.

Right now it appears the only control you have for re-processing is
AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for
the consumers, and hence, the entire job. Beyond that, I don't see any
way to prioritize stream consumption at all, so your KTables will be
getting materialized while the general stream processing work is
running concurrently.

I wanted to see if this case is actually supported already and I'm
missing something, or if not, if these options make sense. If this
seems reasonable and it's not too complicated, I could possibly try to
get together a patch. If so, any tips on implementing this would be
helpful as well. Thanks!

-Greg