Re: Idle cluster high CPU usage

2017-09-25 Thread Elliot Crosby-McCullough
We did a bunch of sampling to no particular aid, broadly speaking the
answer was "it's doing a bunch of talking".

For those who might want to know what this was in the end, during part of
our previous debugging we enabled `javax.net.debug=all` and didn't twig
that that had no effect on the log4j logs, and didn't notice the vast
number of iops to `kafkaServer.out`.  Writing that log was eating all the
CPU.

On 23 September 2017 at 00:44, jrpi...@gmail.com <jrpi...@gmail.com> wrote:

> One thing worth trying is hooking up to 1 or more of the brokers via JMX
> and examining the running threads;  If that doesn't elucidate the cause,
> you could move onto sampling or profiling via JMX to see what's taking up
> all that CPU.
>
> - Jordan Pilat
>
> On 2017-09-21 07:58, Elliot Crosby-McCullough <elliot.crosby-mccullough@
> freeagent.com> wrote:
> > Hello,
> >
> > We've been trying to debug an issue with our kafka cluster for several
> days
> > now and we're close to out of options.
> >
> > We have 3 kafka brokers associated with 3 zookeeper nodes and 3 registry
> > nodes, plus a few streams clients and a ruby producer.
> >
> > Two of the three brokers are pinning a core and have been for days, no
> > amount of restarting, debugging, or clearing out of data seems to help.
> >
> > We've got the logs at DEBUG level which shows a constant flow much like
> > this: https://gist.github.com/elliotcm/e66a1ca838558664bab0c91549acb251
> >
> > As best as we can tell the brokers are up to date on replication and the
> > leaders are well-balanced.  The cluster is receiving no traffic; no
> > messages are being sent in and the consumers/streams are shut down.
> >
> > From our profiling of the JVM it looks like the CPU is mostly working in
> > replication threads and SSL traffic (it's a secured cluster) but that
> > shouldn't be treated as gospel.
> >
> > Any advice would be greatly appreciated.
> >
> > All the best,
> > Elliot
> >
>


Re: Idle cluster high CPU usage

2017-09-21 Thread Elliot Crosby-McCullough
Nothing, that value (that group of values) was at default when we started
the debugging.

On 21 September 2017 at 15:08, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks. What happens if you reduce num.replica.fetchers?
>
> On Thu, Sep 21, 2017 at 3:02 PM, Elliot Crosby-McCullough <
> elliot.crosby-mccullo...@freeagent.com> wrote:
>
> > 551 partitions, broker configs are:
> > https://gist.github.com/elliotcm/3a35f66377c2ef4020d76508f49f106b
> >
> > We tweaked it a bit from standard recently but that was as part of the
> > debugging process.
> >
> > After some more experimentation I'm seeing the same behaviour at about
> half
> > the CPU after creating one 50 partition topic in an otherwise empty
> > cluster.
> >
> > On 21 September 2017 at 14:20, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > A couple of questions: how many partitions in the cluster and what are
> > your
> > > broker configs?
> > >
> > > On Thu, Sep 21, 2017 at 1:58 PM, Elliot Crosby-McCullough <
> > > elliot.crosby-mccullo...@freeagent.com> wrote:
> > >
> > > > Hello,
> > > >
> > > > We've been trying to debug an issue with our kafka cluster for
> several
> > > days
> > > > now and we're close to out of options.
> > > >
> > > > We have 3 kafka brokers associated with 3 zookeeper nodes and 3
> > registry
> > > > nodes, plus a few streams clients and a ruby producer.
> > > >
> > > > Two of the three brokers are pinning a core and have been for days,
> no
> > > > amount of restarting, debugging, or clearing out of data seems to
> help.
> > > >
> > > > We've got the logs at DEBUG level which shows a constant flow much
> like
> > > > this: https://gist.github.com/elliotcm/
> e66a1ca838558664bab0c91549acb2
> > 51
> > > >
> > > > As best as we can tell the brokers are up to date on replication and
> > the
> > > > leaders are well-balanced.  The cluster is receiving no traffic; no
> > > > messages are being sent in and the consumers/streams are shut down.
> > > >
> > > > From our profiling of the JVM it looks like the CPU is mostly working
> > in
> > > > replication threads and SSL traffic (it's a secured cluster) but that
> > > > shouldn't be treated as gospel.
> > > >
> > > > Any advice would be greatly appreciated.
> > > >
> > > > All the best,
> > > > Elliot
> > > >
> > >
> >
>


Re: Idle cluster high CPU usage

2017-09-21 Thread Elliot Crosby-McCullough
551 partitions, broker configs are:
https://gist.github.com/elliotcm/3a35f66377c2ef4020d76508f49f106b

We tweaked it a bit from standard recently but that was as part of the
debugging process.

After some more experimentation I'm seeing the same behaviour at about half
the CPU after creating one 50 partition topic in an otherwise empty cluster.

On 21 September 2017 at 14:20, Ismael Juma <ism...@juma.me.uk> wrote:

> A couple of questions: how many partitions in the cluster and what are your
> broker configs?
>
> On Thu, Sep 21, 2017 at 1:58 PM, Elliot Crosby-McCullough <
> elliot.crosby-mccullo...@freeagent.com> wrote:
>
> > Hello,
> >
> > We've been trying to debug an issue with our kafka cluster for several
> days
> > now and we're close to out of options.
> >
> > We have 3 kafka brokers associated with 3 zookeeper nodes and 3 registry
> > nodes, plus a few streams clients and a ruby producer.
> >
> > Two of the three brokers are pinning a core and have been for days, no
> > amount of restarting, debugging, or clearing out of data seems to help.
> >
> > We've got the logs at DEBUG level which shows a constant flow much like
> > this: https://gist.github.com/elliotcm/e66a1ca838558664bab0c91549acb251
> >
> > As best as we can tell the brokers are up to date on replication and the
> > leaders are well-balanced.  The cluster is receiving no traffic; no
> > messages are being sent in and the consumers/streams are shut down.
> >
> > From our profiling of the JVM it looks like the CPU is mostly working in
> > replication threads and SSL traffic (it's a secured cluster) but that
> > shouldn't be treated as gospel.
> >
> > Any advice would be greatly appreciated.
> >
> > All the best,
> > Elliot
> >
>


Idle cluster high CPU usage

2017-09-21 Thread Elliot Crosby-McCullough
Hello,

We've been trying to debug an issue with our kafka cluster for several days
now and we're close to out of options.

We have 3 kafka brokers associated with 3 zookeeper nodes and 3 registry
nodes, plus a few streams clients and a ruby producer.

Two of the three brokers are pinning a core and have been for days, no
amount of restarting, debugging, or clearing out of data seems to help.

We've got the logs at DEBUG level which shows a constant flow much like
this: https://gist.github.com/elliotcm/e66a1ca838558664bab0c91549acb251

As best as we can tell the brokers are up to date on replication and the
leaders are well-balanced.  The cluster is receiving no traffic; no
messages are being sent in and the consumers/streams are shut down.

>From our profiling of the JVM it looks like the CPU is mostly working in
replication threads and SSL traffic (it's a secured cluster) but that
shouldn't be treated as gospel.

Any advice would be greatly appreciated.

All the best,
Elliot


Re: Custom stream processor not triggering #punctuate()

2017-03-28 Thread Elliot Crosby-McCullough
Hi Michael,

My confusion was that the events are being created, transferred, and
received several seconds apart (longer than the punctuate schedule) with no
stalling because I'm triggering them by hand, so regardless of what
mechanism is being used for timing it should still be called.

That said, I've just noticed in the callout box that it will only advance
stream time if all input topics have new data which in my testing is not
the case, so I suppose I will need to attach the processor to each input
topic rather than processing them all at the same time (in this use case
they were being split back out in the processor).

Thanks,
Elliot

On 28 March 2017 at 10:18, Michael Noll <mich...@confluent.io> wrote:

> Elliot,
>
> in the current API, `punctuate()` is called based on the current
> stream-time (which defaults to event-time), not based on the current
> wall-clock time / processing-time.  See http://docs.confluent.io/
> current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
> advanced only when new input records are coming in, so if there's e.g. a
> stall on incoming records, then `punctuate()` will not be called.
>
> If you need to schedule a call every N minutes of wall-clock time you'd
> need to use your own scheduler.
>
> Does that help?
> Michael
>
>
>
> On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
> elliot.crosby-mccullo...@freeagent.com> wrote:
>
> > Hi there,
> >
> > I've written a simple processor which expects to have #process called on
> it
> > for each message and configures regular punctuate calls via
> > `context.schedule`.
> >
> > Regardless of what configuration I try for timestamp extraction I cannot
> > get #punctuate to be called, despite #process being called for every
> > message (which are being sent several seconds apart).  I've set the
> > schedule as low as 1 (though the docs aren't clear whether that's micro,
> > milli, or just seconds) and tried both the wallclock time extractor and
> the
> > default time extractor in both the global config and the state store
> serde.
> >
> > These particular messages are being generated by another kafka streams
> DSL
> > application and I'm using kafka 0.10.2.0, so presumably they also have
> > automatically embedded timestamps.
> >
> > I can't for the life of me figure out what's going on.  Could you clue me
> > in?
> >
> > Thanks,
> > Elliot
> >
>


Custom stream processor not triggering #punctuate()

2017-03-28 Thread Elliot Crosby-McCullough
Hi there,

I've written a simple processor which expects to have #process called on it
for each message and configures regular punctuate calls via
`context.schedule`.

Regardless of what configuration I try for timestamp extraction I cannot
get #punctuate to be called, despite #process being called for every
message (which are being sent several seconds apart).  I've set the
schedule as low as 1 (though the docs aren't clear whether that's micro,
milli, or just seconds) and tried both the wallclock time extractor and the
default time extractor in both the global config and the state store serde.

These particular messages are being generated by another kafka streams DSL
application and I'm using kafka 0.10.2.0, so presumably they also have
automatically embedded timestamps.

I can't for the life of me figure out what's going on.  Could you clue me
in?

Thanks,
Elliot


Re: Table a KStream

2017-02-12 Thread Elliot Crosby-McCullough
Thanks!

On 11 February 2017 at 18:39, Eno Thereska <eno.there...@gmail.com> wrote:

> Yes you are correct Elliot. If you look at it just from the point of view
> of what's in the topic, and how the topic is configured (e.g., with
> "compact") there is no difference.
>
>
> Thanks
> Eno
>
> > On 11 Feb 2017, at 17:56, Elliot Crosby-McCullough <elliot...@gmail.com>
> wrote:
> >
> > For my own clarity, is there any actual distinction between
> > `stream.to('topic')`
> > where `topic` is set to compact and the upcoming
> `stream.toTable('topic')`
> > if you're not going to immediately use the table in this topology, i.e.
> if
> > you want to use it as a table in some other processor application?  Am I
> > right in my understanding that the table aspect is related to behaviour
> > within this specific processor (GlobalKTables aside)?
> >
> > On 10 February 2017 at 21:59, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I agree that the API can be improved and we are working on that.
> >>
> >> Btw: KStream#toTable() was already suggested in KIP-114 discussion:
> >>
> >> http://search-hadoop.com/m/Kafka/uyzND19QaLMqiR2e1?subj=
> >> Re+DISCUSS+KIP+114+KTable+materialization+and+improved+semantics
> >>
> >>
> >> However for now, you can only choose from the two options as described.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 2/10/17 1:49 PM, Nick DeCoursin wrote:
> >>> To be honest, I don't think either of these options are very good.
> >>>
> >>>
> >>> stream.to("some-other-topic");
> >>> builder.table("some-other-topic");
> >>>
> >>>
> >>> As explained here
> >>> <http://mail-archives.apache.org/mod_mbox/kafka-users/
> >> 201702.mbox/browser>,
> >>> if the underlying topic doesn't have cleanup.policy=compact, the state
> >>> store may invisibly lose data!
> >>>
> >>> As for the second option, it's obviously verbose, and it doesn't let me
> >>> choose the topic name. As a result, I end with some long weird
> changelog
> >>> topic name; and, some of the Kafka Connectors map topics to the table
> >> name,
> >>> eeeh.
> >>>
> >>> To improve this, just add a topic parameter to both of the overloaded
> >>> KGroupedStream.reduce methods:
> >>>
> >>> reduce(Reducer reducer, String, topicName, String storeName)
> >>>
> >>>
> >>> Furthermore, there should be a short cut to the second option, like
> this:
> >>>
> >>> // Creates a KTable from the KStream where
> >>> // the key of the KStream is the key of the KTable.
> >>> // Any latter key overwrites the former.
> >>> someStream.table(
> >>>   Serde,
> >>>   Serde,
> >>>   topicName,
> >>>   tableName
> >>> );
> >>>
> >>>
> >>> or maybe the Serdes can be inferred? Either way, this would be a nice
> >> clean
> >>> approach to a (maybe?) common use case.
> >>>
> >>> Thank you,
> >>> Nick
> >>>
> >>> On 25 January 2017 at 07:46, Nick DeCoursin <n.decour...@foodpanda.com
> >
> >>> wrote:
> >>>
> >>>> Thank you very much, both suggestions are wonderful, and I will try
> >> them.
> >>>> Have a great day!
> >>>>
> >>>> Kind regards,
> >>>> Nick
> >>>>
> >>>> On 24 January 2017 at 19:46, Matthias J. Sax <matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> If your data is already partitioned by key, you can save writing to a
> >>>>> topic by doing a dummy reduce instead:
> >>>>>
> >>>>> stream
> >>>>>  .groupByKey()
> >>>>>  .reduce(new Reducer() {
> >>>>>V apply(V value1, V value2) {
> >>>>>  return value2;
> >>>>>}
> >>>>>  },
> >>>>>  "yourStoreName");
> >>>>>
> >>>>> (replace V with your actuall value type)
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 1/24/17 8:53 AM, Damian Guy wrote:
> >&

Re: Table a KStream

2017-02-11 Thread Elliot Crosby-McCullough
For my own clarity, is there any actual distinction between
`stream.to('topic')`
where `topic` is set to compact and the upcoming `stream.toTable('topic')`
if you're not going to immediately use the table in this topology, i.e. if
you want to use it as a table in some other processor application?  Am I
right in my understanding that the table aspect is related to behaviour
within this specific processor (GlobalKTables aside)?

On 10 February 2017 at 21:59, Matthias J. Sax  wrote:

> I agree that the API can be improved and we are working on that.
>
> Btw: KStream#toTable() was already suggested in KIP-114 discussion:
>
> http://search-hadoop.com/m/Kafka/uyzND19QaLMqiR2e1?subj=
> Re+DISCUSS+KIP+114+KTable+materialization+and+improved+semantics
>
>
> However for now, you can only choose from the two options as described.
>
>
> -Matthias
>
>
>
> On 2/10/17 1:49 PM, Nick DeCoursin wrote:
> > To be honest, I don't think either of these options are very good.
> >
> >
> > stream.to("some-other-topic");
> > builder.table("some-other-topic");
> >
> >
> > As explained here
> >  201702.mbox/browser>,
> > if the underlying topic doesn't have cleanup.policy=compact, the state
> > store may invisibly lose data!
> >
> > As for the second option, it's obviously verbose, and it doesn't let me
> > choose the topic name. As a result, I end with some long weird changelog
> > topic name; and, some of the Kafka Connectors map topics to the table
> name,
> > eeeh.
> >
> > To improve this, just add a topic parameter to both of the overloaded
> > KGroupedStream.reduce methods:
> >
> > reduce(Reducer reducer, String, topicName, String storeName)
> >
> >
> > Furthermore, there should be a short cut to the second option, like this:
> >
> > // Creates a KTable from the KStream where
> > // the key of the KStream is the key of the KTable.
> > // Any latter key overwrites the former.
> > someStream.table(
> >Serde,
> >Serde,
> >topicName,
> >tableName
> > );
> >
> >
> > or maybe the Serdes can be inferred? Either way, this would be a nice
> clean
> > approach to a (maybe?) common use case.
> >
> > Thank you,
> > Nick
> >
> > On 25 January 2017 at 07:46, Nick DeCoursin 
> > wrote:
> >
> >> Thank you very much, both suggestions are wonderful, and I will try
> them.
> >> Have a great day!
> >>
> >> Kind regards,
> >> Nick
> >>
> >> On 24 January 2017 at 19:46, Matthias J. Sax 
> >> wrote:
> >>
> >>> If your data is already partitioned by key, you can save writing to a
> >>> topic by doing a dummy reduce instead:
> >>>
> >>> stream
> >>>   .groupByKey()
> >>>   .reduce(new Reducer() {
> >>> V apply(V value1, V value2) {
> >>>   return value2;
> >>> }
> >>>   },
> >>>   "yourStoreName");
> >>>
> >>> (replace V with your actuall value type)
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/24/17 8:53 AM, Damian Guy wrote:
>  Hi Nick,
> 
>  I guess there is some reason why you can't just build it as a table to
>  begin with?
>  There isn't a convenient method for doing this right now, but you
> could
> >>> do
>  something like:
> 
>  stream.to("some-other-topic");
>  builder.table("some-other-topic");
> 
>  Thanks,
>  Damian
> 
>  On Tue, 24 Jan 2017 at 16:32 Nick DeCoursin <
> n.decour...@foodpanda.com>
>  wrote:
> 
> > Hello,
> >
> > How can I simply table a Kafka Stream? I have a Kafka Stream, and I
> >>> want to
> > create a table from it backed by a state store. The key of the stream
> >>> could
> > be the same as the table.
> >
> > I've tried following examples, but it seems all examples use
> `groupBy`
> >>> or
> > `count` to convert `KStream`s into `KTable`s. Is there any other way?
> >
> > Thank you very much,
> > Nick DeCoursin
> >
> > --
> >
> > Nick DeCoursin
> > Software Engineer
> > foodpanda
> >
> > Tel | +1 920 450 5434 <(920)%20450-5434>
> >
> > Mail | n.decour...@foodpanda.com
> >
> > Skype | nick.foodpanda
> >
> > Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
> > Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
> > USt-ID-Nr | DE 283789080
> > Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel
> >
> > CONFIDENTIALITY NOTICE: This message (including any attachments) is
> > confidential and may be privileged. It may be read, copied and used
> >>> only by
> > the intended recipient. If you have received it in error please
> >>> contact the
> > sender (by return e-mail) immediately and delete this message. Any
> > unauthorized use or dissemination of this message in whole or in
> parts
> >>> is
> > strictly prohibited.
> >
> 
> >>>
> >>>
> >>
> >>
> >> --
> >>
> >> Nick DeCoursin
> >> Software Engineer
> >> foodpanda
> >>
> >> 

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-02 Thread Elliot Crosby-McCullough
Right.  Maybe it's best to use some kind of idempotent foreign key then, or
at least a small in-thread cache.  Thanks for the info.

On 2 February 2017 at 09:46, Damian Guy <damian@gmail.com> wrote:

> Hi Elliot,
>
> With GlobalKTables your processor wouldn't be able to write directly to the
> table - they are read-only from all threads except the thread that keeps
> them up-to-date.
> You could potentially write the dimension data to the topic that is the
> source of the GlobalKTable, but there is no guarantee that the GlobalKTable
> will have been updated before you receive another message that is referring
> to the same dimension. So you'd need to handle this somehow yourself.
>
> On Thu, 2 Feb 2017 at 08:26 Elliot Crosby-McCullough <elliot...@gmail.com>
> wrote:
>
> > Sorry I left out too much context there.
> >
> > The current plan is to take a raw stream of events as a source and split
> > them into a stream of facts and a table (or tables) of dimensions.
> >
> > Because this is denormalising the data, we only want one copy of each
> > dimension entry despite the original events repeating them over and over.
> >
> > The current plan is therefore to query the dimension table when a
> relevant
> > event comes in so we can apply the following logic:
> > 1. If there's a matching dimension entry already in the table, reference
> > that in the fact
> > 2. If not, create one in the table and reference it in the fact
> >
> > Hence find or create.
> >
> > This means that the tables will need to be read/write in some way by the
> > processor.  I referred to it as "first pass" because there will be
> > downstream processors in this app or another which will be reading the
> fact
> > and dimension topics and putting them into DBs like redshift.
> >
> > On 1 February 2017 at 22:40, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > I am not sure if I can follow... what do you mean by "find or create"
> > > semantics?
> > >
> > > What do you mean by "my first pass processor"?
> > >
> > >
> > > -Matthias
> > >
> > > On 2/1/17 12:24 PM, Elliot Crosby-McCullough wrote:
> > > > Ah, bueno!
> > > >
> > > > The first use case I have is actually to split a raw event stream
> into
> > > > facts and dimensions, but it looks like this is still the right
> > solution
> > > as
> > > > interactive queries can be done against it for "find or create"
> > > semantics.
> > > >
> > > > The KIP mentions that the GlobalKTable "will only be used for doing
> > > > lookups", but presumably my first pass processor can still output new
> > > > dimension entries to the topic that the table is backed by?  Again
> for
> > > > "find or create".
> > > >
> > > > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io>
> > > wrote:
> > > >
> > > >> Thanks!
> > > >>
> > > >> About your example: in upcoming 0.10.2 release we will have
> > > >> KStream-GlobalKTable join that is designed for exactly your use case
> > to
> > > >> enrich a "fact stream" with dimension tables.
> > > >>
> > > >> If you use this feature, "stream time" will not depend on the
> > > >> GlobalKTables (but only only the "main topology") and thus the
> current
> > > >> punctuate() issues should be resolved for this case.
> > > >>
> > > >> cf.
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> 99%3A+Add+Global+Tables+to+Kafka+Streams
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote:
> > > >>> Yes it does make sense, thank you, that's what I thought the
> > behaviour
> > > >> was.
> > > >>>
> > > >>> I think having the option of triggering at least every n real-time
> > > >> seconds
> > > >>> (or whatever period) would be very useful, as I can see a number of
> > > >>> situations where the time between updates to some tables might be
> > very
> > > >>> infrequent indeed.
> > > >>>
> > > >>> To provide a concrete e

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-02 Thread Elliot Crosby-McCullough
Sorry I left out too much context there.

The current plan is to take a raw stream of events as a source and split
them into a stream of facts and a table (or tables) of dimensions.

Because this is denormalising the data, we only want one copy of each
dimension entry despite the original events repeating them over and over.

The current plan is therefore to query the dimension table when a relevant
event comes in so we can apply the following logic:
1. If there's a matching dimension entry already in the table, reference
that in the fact
2. If not, create one in the table and reference it in the fact

Hence find or create.

This means that the tables will need to be read/write in some way by the
processor.  I referred to it as "first pass" because there will be
downstream processors in this app or another which will be reading the fact
and dimension topics and putting them into DBs like redshift.

On 1 February 2017 at 22:40, Matthias J. Sax <matth...@confluent.io> wrote:

> I am not sure if I can follow... what do you mean by "find or create"
> semantics?
>
> What do you mean by "my first pass processor"?
>
>
> -Matthias
>
> On 2/1/17 12:24 PM, Elliot Crosby-McCullough wrote:
> > Ah, bueno!
> >
> > The first use case I have is actually to split a raw event stream into
> > facts and dimensions, but it looks like this is still the right solution
> as
> > interactive queries can be done against it for "find or create"
> semantics.
> >
> > The KIP mentions that the GlobalKTable "will only be used for doing
> > lookups", but presumably my first pass processor can still output new
> > dimension entries to the topic that the table is backed by?  Again for
> > "find or create".
> >
> > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> Thanks!
> >>
> >> About your example: in upcoming 0.10.2 release we will have
> >> KStream-GlobalKTable join that is designed for exactly your use case to
> >> enrich a "fact stream" with dimension tables.
> >>
> >> If you use this feature, "stream time" will not depend on the
> >> GlobalKTables (but only only the "main topology") and thus the current
> >> punctuate() issues should be resolved for this case.
> >>
> >> cf.
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 99%3A+Add+Global+Tables+to+Kafka+Streams
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote:
> >>> Yes it does make sense, thank you, that's what I thought the behaviour
> >> was.
> >>>
> >>> I think having the option of triggering at least every n real-time
> >> seconds
> >>> (or whatever period) would be very useful, as I can see a number of
> >>> situations where the time between updates to some tables might be very
> >>> infrequent indeed.
> >>>
> >>> To provide a concrete example, this KTable will be holding the
> dimensions
> >>> of a star schema while the KStream will be holding the facts.  If the
> >>> KTable (or a partition thereof) is limited to one dimension type (i.e.
> >>> one-to-one with the real star schema tables) then in some cases it will
> >> be
> >>> hours or days apart.  The KTable representing the `date` dimension will
> >> not
> >>> update faster than once per day.
> >>>
> >>> Likewise using kafka as a changelog for a table like Rails'
> >>> `schema_migrations` could easily go weeks without an update.
> >>>
> >>> Until the decision is made regarding the timing would it be best to
> >> ignore
> >>> `punctuate` entirely and trigger everything message by message via
> >>> `process`?
> >>>
> >>> On 1 February 2017 at 17:43, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>>
> >>>> One thing to add:
> >>>>
> >>>> There are plans/ideas to change punctuate() semantics to "system time"
> >>>> instead of "stream time". Would this be helpful for your use case?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 2/1/17 9:41 AM, Matthias J. Sax wrote:
> >>>>> Yes and no.
> >>>>>
> >>>>> It does not depend on the number of tuples but on the timestamps of
> the
> >>>>> tu

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Elliot Crosby-McCullough
Ah, bueno!

The first use case I have is actually to split a raw event stream into
facts and dimensions, but it looks like this is still the right solution as
interactive queries can be done against it for "find or create" semantics.

The KIP mentions that the GlobalKTable "will only be used for doing
lookups", but presumably my first pass processor can still output new
dimension entries to the topic that the table is backed by?  Again for
"find or create".

On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io> wrote:

> Thanks!
>
> About your example: in upcoming 0.10.2 release we will have
> KStream-GlobalKTable join that is designed for exactly your use case to
> enrich a "fact stream" with dimension tables.
>
> If you use this feature, "stream time" will not depend on the
> GlobalKTables (but only only the "main topology") and thus the current
> punctuate() issues should be resolved for this case.
>
> cf.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 99%3A+Add+Global+Tables+to+Kafka+Streams
>
>
>
> -Matthias
>
> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote:
> > Yes it does make sense, thank you, that's what I thought the behaviour
> was.
> >
> > I think having the option of triggering at least every n real-time
> seconds
> > (or whatever period) would be very useful, as I can see a number of
> > situations where the time between updates to some tables might be very
> > infrequent indeed.
> >
> > To provide a concrete example, this KTable will be holding the dimensions
> > of a star schema while the KStream will be holding the facts.  If the
> > KTable (or a partition thereof) is limited to one dimension type (i.e.
> > one-to-one with the real star schema tables) then in some cases it will
> be
> > hours or days apart.  The KTable representing the `date` dimension will
> not
> > update faster than once per day.
> >
> > Likewise using kafka as a changelog for a table like Rails'
> > `schema_migrations` could easily go weeks without an update.
> >
> > Until the decision is made regarding the timing would it be best to
> ignore
> > `punctuate` entirely and trigger everything message by message via
> > `process`?
> >
> > On 1 February 2017 at 17:43, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> One thing to add:
> >>
> >> There are plans/ideas to change punctuate() semantics to "system time"
> >> instead of "stream time". Would this be helpful for your use case?
> >>
> >>
> >> -Matthias
> >>
> >> On 2/1/17 9:41 AM, Matthias J. Sax wrote:
> >>> Yes and no.
> >>>
> >>> It does not depend on the number of tuples but on the timestamps of the
> >>> tuples.
> >>>
> >>> I would assume, that records in the high volume stream have timestamps
> >>> that are only a few milliseconds from each other, while for the low
> >>> volume KTable, record have timestamp differences that are much bigger
> >>> (maybe seconds).
> >>>
> >>> Thus, even if you schedule a punctuation every 30 seconds, it will get
> >>> triggered as expected. As you get KTable input on a second basis that
> >>> advanced KTable time in larger steps -- thus KTable always "catches
> up".
> >>>
> >>> Only for the (real time) case, that a single partition does not make
> >>> process because no new data gets appended that is longer than your
> >>> punctuation interval, some calls to punctuate might not fire.
> >>>
> >>> Let's say the KTable does not get an update for 5 Minutes, than you
> >>> would miss 9 calls to punctuate(), and get only a single call after the
> >>> KTable update. (Of course, only if all partitions advance time
> >> accordingly.)
> >>>
> >>>
> >>> Does this make sense?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 2/1/17 7:37 AM, Elliot Crosby-McCullough wrote:
> >>>> Hi there,
> >>>>
> >>>> I've been reading through the Kafka Streams documentation and there
> >> seems
> >>>> to be a tricky limitation that I'd like to make sure I've understood
> >>>> correctly.
> >>>>
> >>>> The docs[1] talk about the `punctuate` callback being based on stream
> >> time
> >>>> and that all incoming partitions of all incoming topics must have
> >>>> progressed through the minimum time interval for `punctuate` to be
> >> called.
> >>>>
> >>>> This seems to be like a problem for situations where you have one very
> >> fast
> >>>> and one very slow stream being processed together, for example
> joining a
> >>>> fast-moving KStream to a slow-changing KTable.
> >>>>
> >>>> Have I misunderstood something or is this relatively common use case
> not
> >>>> supported with the `punctuate` callback?
> >>>>
> >>>> Many thanks,
> >>>> Elliot
> >>>>
> >>>> [1]
> >>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
> >> html#defining-a-stream-processor
> >>>> (see the "Attention" box)
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Elliot Crosby-McCullough
Yes it does make sense, thank you, that's what I thought the behaviour was.

I think having the option of triggering at least every n real-time seconds
(or whatever period) would be very useful, as I can see a number of
situations where the time between updates to some tables might be very
infrequent indeed.

To provide a concrete example, this KTable will be holding the dimensions
of a star schema while the KStream will be holding the facts.  If the
KTable (or a partition thereof) is limited to one dimension type (i.e.
one-to-one with the real star schema tables) then in some cases it will be
hours or days apart.  The KTable representing the `date` dimension will not
update faster than once per day.

Likewise using kafka as a changelog for a table like Rails'
`schema_migrations` could easily go weeks without an update.

Until the decision is made regarding the timing would it be best to ignore
`punctuate` entirely and trigger everything message by message via
`process`?

On 1 February 2017 at 17:43, Matthias J. Sax <matth...@confluent.io> wrote:

> One thing to add:
>
> There are plans/ideas to change punctuate() semantics to "system time"
> instead of "stream time". Would this be helpful for your use case?
>
>
> -Matthias
>
> On 2/1/17 9:41 AM, Matthias J. Sax wrote:
> > Yes and no.
> >
> > It does not depend on the number of tuples but on the timestamps of the
> > tuples.
> >
> > I would assume, that records in the high volume stream have timestamps
> > that are only a few milliseconds from each other, while for the low
> > volume KTable, record have timestamp differences that are much bigger
> > (maybe seconds).
> >
> > Thus, even if you schedule a punctuation every 30 seconds, it will get
> > triggered as expected. As you get KTable input on a second basis that
> > advanced KTable time in larger steps -- thus KTable always "catches up".
> >
> > Only for the (real time) case, that a single partition does not make
> > process because no new data gets appended that is longer than your
> > punctuation interval, some calls to punctuate might not fire.
> >
> > Let's say the KTable does not get an update for 5 Minutes, than you
> > would miss 9 calls to punctuate(), and get only a single call after the
> > KTable update. (Of course, only if all partitions advance time
> accordingly.)
> >
> >
> > Does this make sense?
> >
> >
> > -Matthias
> >
> > On 2/1/17 7:37 AM, Elliot Crosby-McCullough wrote:
> >> Hi there,
> >>
> >> I've been reading through the Kafka Streams documentation and there
> seems
> >> to be a tricky limitation that I'd like to make sure I've understood
> >> correctly.
> >>
> >> The docs[1] talk about the `punctuate` callback being based on stream
> time
> >> and that all incoming partitions of all incoming topics must have
> >> progressed through the minimum time interval for `punctuate` to be
> called.
> >>
> >> This seems to be like a problem for situations where you have one very
> fast
> >> and one very slow stream being processed together, for example joining a
> >> fast-moving KStream to a slow-changing KTable.
> >>
> >> Have I misunderstood something or is this relatively common use case not
> >> supported with the `punctuate` callback?
> >>
> >> Many thanks,
> >> Elliot
> >>
> >> [1]
> >> http://docs.confluent.io/3.1.2/streams/developer-guide.
> html#defining-a-stream-processor
> >> (see the "Attention" box)
> >>
> >
>
>


Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Elliot Crosby-McCullough
Hi there,

I've been reading through the Kafka Streams documentation and there seems
to be a tricky limitation that I'd like to make sure I've understood
correctly.

The docs[1] talk about the `punctuate` callback being based on stream time
and that all incoming partitions of all incoming topics must have
progressed through the minimum time interval for `punctuate` to be called.

This seems to be like a problem for situations where you have one very fast
and one very slow stream being processed together, for example joining a
fast-moving KStream to a slow-changing KTable.

Have I misunderstood something or is this relatively common use case not
supported with the `punctuate` callback?

Many thanks,
Elliot

[1]
http://docs.confluent.io/3.1.2/streams/developer-guide.html#defining-a-stream-processor
(see the "Attention" box)