Hi Michael,

My confusion was that the events are being created, transferred, and
received several seconds apart (longer than the punctuate schedule) with no
stalling because I'm triggering them by hand, so regardless of what
mechanism is being used for timing it should still be called.

That said, I've just noticed in the callout box that it will only advance
stream time if all input topics have new data which in my testing is not
the case, so I suppose I will need to attach the processor to each input
topic rather than processing them all at the same time (in this use case
they were being split back out in the processor).

Thanks,
Elliot

On 28 March 2017 at 10:18, Michael Noll <mich...@confluent.io> wrote:

> Elliot,
>
> in the current API, `punctuate()` is called based on the current
> stream-time (which defaults to event-time), not based on the current
> wall-clock time / processing-time.  See http://docs.confluent.io/
> current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
> advanced only when new input records are coming in, so if there's e.g. a
> stall on incoming records, then `punctuate()` will not be called.
>
> If you need to schedule a call every N minutes of wall-clock time you'd
> need to use your own scheduler.
>
> Does that help?
> Michael
>
>
>
> On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
> elliot.crosby-mccullo...@freeagent.com> wrote:
>
> > Hi there,
> >
> > I've written a simple processor which expects to have #process called on
> it
> > for each message and configures regular punctuate calls via
> > `context.schedule`.
> >
> > Regardless of what configuration I try for timestamp extraction I cannot
> > get #punctuate to be called, despite #process being called for every
> > message (which are being sent several seconds apart).  I've set the
> > schedule as low as 1 (though the docs aren't clear whether that's micro,
> > milli, or just seconds) and tried both the wallclock time extractor and
> the
> > default time extractor in both the global config and the state store
> serde.
> >
> > These particular messages are being generated by another kafka streams
> DSL
> > application and I'm using kafka 0.10.2.0, so presumably they also have
> > automatically embedded timestamps.
> >
> > I can't for the life of me figure out what's going on.  Could you clue me
> > in?
> >
> > Thanks,
> > Elliot
> >
>

Reply via email to