Right. Maybe it's best to use some kind of idempotent foreign key then, or at least a small in-thread cache. Thanks for the info.
On 2 February 2017 at 09:46, Damian Guy <damian....@gmail.com> wrote: > Hi Elliot, > > With GlobalKTables your processor wouldn't be able to write directly to the > table - they are read-only from all threads except the thread that keeps > them up-to-date. > You could potentially write the dimension data to the topic that is the > source of the GlobalKTable, but there is no guarantee that the GlobalKTable > will have been updated before you receive another message that is referring > to the same dimension. So you'd need to handle this somehow yourself. > > On Thu, 2 Feb 2017 at 08:26 Elliot Crosby-McCullough <elliot...@gmail.com> > wrote: > > > Sorry I left out too much context there. > > > > The current plan is to take a raw stream of events as a source and split > > them into a stream of facts and a table (or tables) of dimensions. > > > > Because this is denormalising the data, we only want one copy of each > > dimension entry despite the original events repeating them over and over. > > > > The current plan is therefore to query the dimension table when a > relevant > > event comes in so we can apply the following logic: > > 1. If there's a matching dimension entry already in the table, reference > > that in the fact > > 2. If not, create one in the table and reference it in the fact > > > > Hence find or create. > > > > This means that the tables will need to be read/write in some way by the > > processor. I referred to it as "first pass" because there will be > > downstream processors in this app or another which will be reading the > fact > > and dimension topics and putting them into DBs like redshift. > > > > On 1 February 2017 at 22:40, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > I am not sure if I can follow... what do you mean by "find or create" > > > semantics? > > > > > > What do you mean by "my first pass processor"? > > > > > > > > > -Matthias > > > > > > On 2/1/17 12:24 PM, Elliot Crosby-McCullough wrote: > > > > Ah, bueno! > > > > > > > > The first use case I have is actually to split a raw event stream > into > > > > facts and dimensions, but it looks like this is still the right > > solution > > > as > > > > interactive queries can be done against it for "find or create" > > > semantics. > > > > > > > > The KIP mentions that the GlobalKTable "will only be used for doing > > > > lookups", but presumably my first pass processor can still output new > > > > dimension entries to the topic that the table is backed by? Again > for > > > > "find or create". > > > > > > > > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io> > > > wrote: > > > > > > > >> Thanks! > > > >> > > > >> About your example: in upcoming 0.10.2 release we will have > > > >> KStream-GlobalKTable join that is designed for exactly your use case > > to > > > >> enrich a "fact stream" with dimension tables. > > > >> > > > >> If you use this feature, "stream time" will not depend on the > > > >> GlobalKTables (but only only the "main topology") and thus the > current > > > >> punctuate() issues should be resolved for this case. > > > >> > > > >> cf. > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >> 99%3A+Add+Global+Tables+to+Kafka+Streams > > > >> > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 2/1/17 10:31 AM, Elliot Crosby-McCullough wrote: > > > >>> Yes it does make sense, thank you, that's what I thought the > > behaviour > > > >> was. > > > >>> > > > >>> I think having the option of triggering at least every n real-time > > > >> seconds > > > >>> (or whatever period) would be very useful, as I can see a number of > > > >>> situations where the time between updates to some tables might be > > very > > > >>> infrequent indeed. > > > >>> > > > >>> To provide a concrete example, this KTable will be holding the > > > dimensions > > > >>> of a star schema while the KStream will be holding the facts. If > the > > > >>> KTable (or a partition thereof) is limited to one dimension type > > (i.e. > > > >>> one-to-one with the real star schema tables) then in some cases it > > will > > > >> be > > > >>> hours or days apart. The KTable representing the `date` dimension > > will > > > >> not > > > >>> update faster than once per day. > > > >>> > > > >>> Likewise using kafka as a changelog for a table like Rails' > > > >>> `schema_migrations` could easily go weeks without an update. > > > >>> > > > >>> Until the decision is made regarding the timing would it be best to > > > >> ignore > > > >>> `punctuate` entirely and trigger everything message by message via > > > >>> `process`? > > > >>> > > > >>> On 1 February 2017 at 17:43, Matthias J. Sax < > matth...@confluent.io> > > > >> wrote: > > > >>> > > > >>>> One thing to add: > > > >>>> > > > >>>> There are plans/ideas to change punctuate() semantics to "system > > time" > > > >>>> instead of "stream time". Would this be helpful for your use case? > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> On 2/1/17 9:41 AM, Matthias J. Sax wrote: > > > >>>>> Yes and no. > > > >>>>> > > > >>>>> It does not depend on the number of tuples but on the timestamps > of > > > the > > > >>>>> tuples. > > > >>>>> > > > >>>>> I would assume, that records in the high volume stream have > > > timestamps > > > >>>>> that are only a few milliseconds from each other, while for the > low > > > >>>>> volume KTable, record have timestamp differences that are much > > bigger > > > >>>>> (maybe seconds). > > > >>>>> > > > >>>>> Thus, even if you schedule a punctuation every 30 seconds, it > will > > > get > > > >>>>> triggered as expected. As you get KTable input on a second basis > > that > > > >>>>> advanced KTable time in larger steps -- thus KTable always > "catches > > > >> up". > > > >>>>> > > > >>>>> Only for the (real time) case, that a single partition does not > > make > > > >>>>> process because no new data gets appended that is longer than > your > > > >>>>> punctuation interval, some calls to punctuate might not fire. > > > >>>>> > > > >>>>> Let's say the KTable does not get an update for 5 Minutes, than > you > > > >>>>> would miss 9 calls to punctuate(), and get only a single call > after > > > the > > > >>>>> KTable update. (Of course, only if all partitions advance time > > > >>>> accordingly.) > > > >>>>> > > > >>>>> > > > >>>>> Does this make sense? > > > >>>>> > > > >>>>> > > > >>>>> -Matthias > > > >>>>> > > > >>>>> On 2/1/17 7:37 AM, Elliot Crosby-McCullough wrote: > > > >>>>>> Hi there, > > > >>>>>> > > > >>>>>> I've been reading through the Kafka Streams documentation and > > there > > > >>>> seems > > > >>>>>> to be a tricky limitation that I'd like to make sure I've > > understood > > > >>>>>> correctly. > > > >>>>>> > > > >>>>>> The docs[1] talk about the `punctuate` callback being based on > > > stream > > > >>>> time > > > >>>>>> and that all incoming partitions of all incoming topics must > have > > > >>>>>> progressed through the minimum time interval for `punctuate` to > be > > > >>>> called. > > > >>>>>> > > > >>>>>> This seems to be like a problem for situations where you have > one > > > very > > > >>>> fast > > > >>>>>> and one very slow stream being processed together, for example > > > >> joining a > > > >>>>>> fast-moving KStream to a slow-changing KTable. > > > >>>>>> > > > >>>>>> Have I misunderstood something or is this relatively common use > > case > > > >> not > > > >>>>>> supported with the `punctuate` callback? > > > >>>>>> > > > >>>>>> Many thanks, > > > >>>>>> Elliot > > > >>>>>> > > > >>>>>> [1] > > > >>>>>> http://docs.confluent.io/3.1.2/streams/developer-guide. > > > >>>> html#defining-a-stream-processor > > > >>>>>> (see the "Attention" box) > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >