Hi Elliot,

With GlobalKTables your processor wouldn't be able to write directly to the
table - they are read-only from all threads except the thread that keeps
them up-to-date.
You could potentially write the dimension data to the topic that is the
source of the GlobalKTable, but there is no guarantee that the GlobalKTable
will have been updated before you receive another message that is referring
to the same dimension. So you'd need to handle this somehow yourself.

On Thu, 2 Feb 2017 at 08:26 Elliot Crosby-McCullough <elliot...@gmail.com>
wrote:

> Sorry I left out too much context there.
>
> The current plan is to take a raw stream of events as a source and split
> them into a stream of facts and a table (or tables) of dimensions.
>
> Because this is denormalising the data, we only want one copy of each
> dimension entry despite the original events repeating them over and over.
>
> The current plan is therefore to query the dimension table when a relevant
> event comes in so we can apply the following logic:
> 1. If there's a matching dimension entry already in the table, reference
> that in the fact
> 2. If not, create one in the table and reference it in the fact
>
> Hence find or create.
>
> This means that the tables will need to be read/write in some way by the
> processor.  I referred to it as "first pass" because there will be
> downstream processors in this app or another which will be reading the fact
> and dimension topics and putting them into DBs like redshift.
>
> On 1 February 2017 at 22:40, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I am not sure if I can follow... what do you mean by "find or create"
> > semantics?
> >
> > What do you mean by "my first pass processor"?
> >
> >
> > -Matthias
> >
> > On 2/1/17 12:24 PM, Elliot Crosby-McCullough wrote:
> > > Ah, bueno!
> > >
> > > The first use case I have is actually to split a raw event stream into
> > > facts and dimensions, but it looks like this is still the right
> solution
> > as
> > > interactive queries can be done against it for "find or create"
> > semantics.
> > >
> > > The KIP mentions that the GlobalKTable "will only be used for doing
> > > lookups", but presumably my first pass processor can still output new
> > > dimension entries to the topic that the table is backed by?  Again for
> > > "find or create".
> > >
> > > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> > >
> > >> Thanks!
> > >>
> > >> About your example: in upcoming 0.10.2 release we will have
> > >> KStream-GlobalKTable join that is designed for exactly your use case
> to
> > >> enrich a "fact stream" with dimension tables.
> > >>
> > >> If you use this feature, "stream time" will not depend on the
> > >> GlobalKTables (but only only the "main topology") and thus the current
> > >> punctuate() issues should be resolved for this case.
> > >>
> > >> cf.
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> 99%3A+Add+Global+Tables+to+Kafka+Streams
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote:
> > >>> Yes it does make sense, thank you, that's what I thought the
> behaviour
> > >> was.
> > >>>
> > >>> I think having the option of triggering at least every n real-time
> > >> seconds
> > >>> (or whatever period) would be very useful, as I can see a number of
> > >>> situations where the time between updates to some tables might be
> very
> > >>> infrequent indeed.
> > >>>
> > >>> To provide a concrete example, this KTable will be holding the
> > dimensions
> > >>> of a star schema while the KStream will be holding the facts.  If the
> > >>> KTable (or a partition thereof) is limited to one dimension type
> (i.e.
> > >>> one-to-one with the real star schema tables) then in some cases it
> will
> > >> be
> > >>> hours or days apart.  The KTable representing the `date` dimension
> will
> > >> not
> > >>> update faster than once per day.
> > >>>
> > >>> Likewise using kafka as a changelog for a table like Rails'
> > >>> `schema_migrations` could easily go weeks without an update.
> > >>>
> > >>> Until the decision is made regarding the timing would it be best to
> > >> ignore
> > >>> `punctuate` entirely and trigger everything message by message via
> > >>> `process`?
> > >>>
> > >>> On 1 February 2017 at 17:43, Matthias J. Sax <matth...@confluent.io>
> > >> wrote:
> > >>>
> > >>>> One thing to add:
> > >>>>
> > >>>> There are plans/ideas to change punctuate() semantics to "system
> time"
> > >>>> instead of "stream time". Would this be helpful for your use case?
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 2/1/17 9:41 AM, Matthias J. Sax wrote:
> > >>>>> Yes and no.
> > >>>>>
> > >>>>> It does not depend on the number of tuples but on the timestamps of
> > the
> > >>>>> tuples.
> > >>>>>
> > >>>>> I would assume, that records in the high volume stream have
> > timestamps
> > >>>>> that are only a few milliseconds from each other, while for the low
> > >>>>> volume KTable, record have timestamp differences that are much
> bigger
> > >>>>> (maybe seconds).
> > >>>>>
> > >>>>> Thus, even if you schedule a punctuation every 30 seconds, it will
> > get
> > >>>>> triggered as expected. As you get KTable input on a second basis
> that
> > >>>>> advanced KTable time in larger steps -- thus KTable always "catches
> > >> up".
> > >>>>>
> > >>>>> Only for the (real time) case, that a single partition does not
> make
> > >>>>> process because no new data gets appended that is longer than your
> > >>>>> punctuation interval, some calls to punctuate might not fire.
> > >>>>>
> > >>>>> Let's say the KTable does not get an update for 5 Minutes, than you
> > >>>>> would miss 9 calls to punctuate(), and get only a single call after
> > the
> > >>>>> KTable update. (Of course, only if all partitions advance time
> > >>>> accordingly.)
> > >>>>>
> > >>>>>
> > >>>>> Does this make sense?
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 2/1/17 7:37 AM, Elliot Crosby-McCullough wrote:
> > >>>>>> Hi there,
> > >>>>>>
> > >>>>>> I've been reading through the Kafka Streams documentation and
> there
> > >>>> seems
> > >>>>>> to be a tricky limitation that I'd like to make sure I've
> understood
> > >>>>>> correctly.
> > >>>>>>
> > >>>>>> The docs[1] talk about the `punctuate` callback being based on
> > stream
> > >>>> time
> > >>>>>> and that all incoming partitions of all incoming topics must have
> > >>>>>> progressed through the minimum time interval for `punctuate` to be
> > >>>> called.
> > >>>>>>
> > >>>>>> This seems to be like a problem for situations where you have one
> > very
> > >>>> fast
> > >>>>>> and one very slow stream being processed together, for example
> > >> joining a
> > >>>>>> fast-moving KStream to a slow-changing KTable.
> > >>>>>>
> > >>>>>> Have I misunderstood something or is this relatively common use
> case
> > >> not
> > >>>>>> supported with the `punctuate` callback?
> > >>>>>>
> > >>>>>> Many thanks,
> > >>>>>> Elliot
> > >>>>>>
> > >>>>>> [1]
> > >>>>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
> > >>>> html#defining-a-stream-processor
> > >>>>>> (see the "Attention" box)
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to