Right.  Maybe it's best to use some kind of idempotent foreign key then, or
at least a small in-thread cache.  Thanks for the info.

On 2 February 2017 at 09:46, Damian Guy <damian....@gmail.com> wrote:

> Hi Elliot,
>
> With GlobalKTables your processor wouldn't be able to write directly to the
> table - they are read-only from all threads except the thread that keeps
> them up-to-date.
> You could potentially write the dimension data to the topic that is the
> source of the GlobalKTable, but there is no guarantee that the GlobalKTable
> will have been updated before you receive another message that is referring
> to the same dimension. So you'd need to handle this somehow yourself.
>
> On Thu, 2 Feb 2017 at 08:26 Elliot Crosby-McCullough <elliot...@gmail.com>
> wrote:
>
> > Sorry I left out too much context there.
> >
> > The current plan is to take a raw stream of events as a source and split
> > them into a stream of facts and a table (or tables) of dimensions.
> >
> > Because this is denormalising the data, we only want one copy of each
> > dimension entry despite the original events repeating them over and over.
> >
> > The current plan is therefore to query the dimension table when a
> relevant
> > event comes in so we can apply the following logic:
> > 1. If there's a matching dimension entry already in the table, reference
> > that in the fact
> > 2. If not, create one in the table and reference it in the fact
> >
> > Hence find or create.
> >
> > This means that the tables will need to be read/write in some way by the
> > processor.  I referred to it as "first pass" because there will be
> > downstream processors in this app or another which will be reading the
> fact
> > and dimension topics and putting them into DBs like redshift.
> >
> > On 1 February 2017 at 22:40, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > I am not sure if I can follow... what do you mean by "find or create"
> > > semantics?
> > >
> > > What do you mean by "my first pass processor"?
> > >
> > >
> > > -Matthias
> > >
> > > On 2/1/17 12:24 PM, Elliot Crosby-McCullough wrote:
> > > > Ah, bueno!
> > > >
> > > > The first use case I have is actually to split a raw event stream
> into
> > > > facts and dimensions, but it looks like this is still the right
> > solution
> > > as
> > > > interactive queries can be done against it for "find or create"
> > > semantics.
> > > >
> > > > The KIP mentions that the GlobalKTable "will only be used for doing
> > > > lookups", but presumably my first pass processor can still output new
> > > > dimension entries to the topic that the table is backed by?  Again
> for
> > > > "find or create".
> > > >
> > > > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io>
> > > wrote:
> > > >
> > > >> Thanks!
> > > >>
> > > >> About your example: in upcoming 0.10.2 release we will have
> > > >> KStream-GlobalKTable join that is designed for exactly your use case
> > to
> > > >> enrich a "fact stream" with dimension tables.
> > > >>
> > > >> If you use this feature, "stream time" will not depend on the
> > > >> GlobalKTables (but only only the "main topology") and thus the
> current
> > > >> punctuate() issues should be resolved for this case.
> > > >>
> > > >> cf.
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> 99%3A+Add+Global+Tables+to+Kafka+Streams
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote:
> > > >>> Yes it does make sense, thank you, that's what I thought the
> > behaviour
> > > >> was.
> > > >>>
> > > >>> I think having the option of triggering at least every n real-time
> > > >> seconds
> > > >>> (or whatever period) would be very useful, as I can see a number of
> > > >>> situations where the time between updates to some tables might be
> > very
> > > >>> infrequent indeed.
> > > >>>
> > > >>> To provide a concrete example, this KTable will be holding the
> > > dimensions
> > > >>> of a star schema while the KStream will be holding the facts.  If
> the
> > > >>> KTable (or a partition thereof) is limited to one dimension type
> > (i.e.
> > > >>> one-to-one with the real star schema tables) then in some cases it
> > will
> > > >> be
> > > >>> hours or days apart.  The KTable representing the `date` dimension
> > will
> > > >> not
> > > >>> update faster than once per day.
> > > >>>
> > > >>> Likewise using kafka as a changelog for a table like Rails'
> > > >>> `schema_migrations` could easily go weeks without an update.
> > > >>>
> > > >>> Until the decision is made regarding the timing would it be best to
> > > >> ignore
> > > >>> `punctuate` entirely and trigger everything message by message via
> > > >>> `process`?
> > > >>>
> > > >>> On 1 February 2017 at 17:43, Matthias J. Sax <
> matth...@confluent.io>
> > > >> wrote:
> > > >>>
> > > >>>> One thing to add:
> > > >>>>
> > > >>>> There are plans/ideas to change punctuate() semantics to "system
> > time"
> > > >>>> instead of "stream time". Would this be helpful for your use case?
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>> On 2/1/17 9:41 AM, Matthias J. Sax wrote:
> > > >>>>> Yes and no.
> > > >>>>>
> > > >>>>> It does not depend on the number of tuples but on the timestamps
> of
> > > the
> > > >>>>> tuples.
> > > >>>>>
> > > >>>>> I would assume, that records in the high volume stream have
> > > timestamps
> > > >>>>> that are only a few milliseconds from each other, while for the
> low
> > > >>>>> volume KTable, record have timestamp differences that are much
> > bigger
> > > >>>>> (maybe seconds).
> > > >>>>>
> > > >>>>> Thus, even if you schedule a punctuation every 30 seconds, it
> will
> > > get
> > > >>>>> triggered as expected. As you get KTable input on a second basis
> > that
> > > >>>>> advanced KTable time in larger steps -- thus KTable always
> "catches
> > > >> up".
> > > >>>>>
> > > >>>>> Only for the (real time) case, that a single partition does not
> > make
> > > >>>>> process because no new data gets appended that is longer than
> your
> > > >>>>> punctuation interval, some calls to punctuate might not fire.
> > > >>>>>
> > > >>>>> Let's say the KTable does not get an update for 5 Minutes, than
> you
> > > >>>>> would miss 9 calls to punctuate(), and get only a single call
> after
> > > the
> > > >>>>> KTable update. (Of course, only if all partitions advance time
> > > >>>> accordingly.)
> > > >>>>>
> > > >>>>>
> > > >>>>> Does this make sense?
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 2/1/17 7:37 AM, Elliot Crosby-McCullough wrote:
> > > >>>>>> Hi there,
> > > >>>>>>
> > > >>>>>> I've been reading through the Kafka Streams documentation and
> > there
> > > >>>> seems
> > > >>>>>> to be a tricky limitation that I'd like to make sure I've
> > understood
> > > >>>>>> correctly.
> > > >>>>>>
> > > >>>>>> The docs[1] talk about the `punctuate` callback being based on
> > > stream
> > > >>>> time
> > > >>>>>> and that all incoming partitions of all incoming topics must
> have
> > > >>>>>> progressed through the minimum time interval for `punctuate` to
> be
> > > >>>> called.
> > > >>>>>>
> > > >>>>>> This seems to be like a problem for situations where you have
> one
> > > very
> > > >>>> fast
> > > >>>>>> and one very slow stream being processed together, for example
> > > >> joining a
> > > >>>>>> fast-moving KStream to a slow-changing KTable.
> > > >>>>>>
> > > >>>>>> Have I misunderstood something or is this relatively common use
> > case
> > > >> not
> > > >>>>>> supported with the `punctuate` callback?
> > > >>>>>>
> > > >>>>>> Many thanks,
> > > >>>>>> Elliot
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
> > > >>>> html#defining-a-stream-processor
> > > >>>>>> (see the "Attention" box)
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to