Hello Adam / Jan / John,

Sorry for being late on this thread! I've finally got some time this
weekend to cleanup a load of tasks on my queue (actually I've also realized
there are a bunch of other things I need to enqueue while cleaning them up
--- sth I need to improve on my side). So here are my thoughts:

Regarding the APIs: I like the current written API in the KIP. More
generally I'd prefer to keep the 1) one-to-many join functionalities as
well as 2) other join types than inner as separate KIPs since 1) may worth
a general API refactoring that can benefit not only foreignkey joins but
collocate joins as well (e.g. an extended proposal of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup),
and I'm not sure if other join types would actually be needed (maybe left
join still makes sense), so it's better to wait-for-people-to-ask-and-add
than add-sth-that-no-one-uses.

Regarding whether we enforce step 3) / 4) v.s. introducing a
KScatteredTable for users to inject their own optimization: I'd prefer to
do the current option as-is, and my main rationale is for optimization
rooms inside the Streams internals and the API succinctness. For advanced
users who may indeed prefer KScatteredTable and do their own optimization,
while it is too much of the work to use Processor API directly, I think we
can still extend the current API to support it in the future if it becomes
necessary.

Another note about step 4) resolving out-of-ordering data, as I mentioned
before I think with KIP-258 (embedded timestamp with key-value store) we
can actually make this step simpler than the current proposal. In fact, we
can just keep a single final-result store with timestamps and reject values
that have a smaller timestamp, is that right?


That's all I have in mind now. Again, great appreciation to Adam to make
such HUGE progress on this KIP!


Guozhang

On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:

> If they don't find the time:
> They usually take the opposite path from me :D
> so the answer would be clear.
>
> hence my suggestion to vote.
>
>
> On 04.12.2018 21:06, Adam Bellemare wrote:
> > Hi Guozhang and Matthias
> >
> > I know both of you are quite busy, but we've gotten this KIP to a point
> > where we need more guidance on the API (perhaps a bit of a tie-breaker,
> if
> > you will). If you have anyone else you may think should look at this,
> > please tag them accordingly.
> >
> > The scenario is as such:
> >
> > Current Option:
> > API:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> > 1) Rekey the data to CombinedKey, and shuffles it to the partition with
> the
> > foreignKey (repartition 1)
> > 2) Join the data
> > 3) Shuffle the data back to the original node (repartition 2)
> > 4) Resolve out-of-order arrival / race condition due to foreign-key
> changes.
> >
> > Alternate Option:
> > Perform #1 and #2 above, and return a KScatteredTable.
> > - It would be keyed on a wrapped key function: <CombinedKey<KO, K>, VR>
> (KO
> > = Other Table Key, K = This Table Key, VR = Joined Result)
> > - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user
> > would be able to perform additional functions directly from the
> > KScatteredTable (TBD - currently out of scope).
> > - John's analysis 2-emails up is accurate as to the tradeoffs.
> >
> > Current Option is coded as-is. Alternate option is possible, but will
> > require for implementation details to be made in the API and some
> exposure
> > of new data structures into the API (ie: CombinedKey).
> >
> > I appreciate any insight into this.
> >
> > Thanks.
> >
> > On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> >> Hi John
> >>
> >> Thanks for your feedback and assistance. I think your summary is
> accurate
> >> from my perspective. Additionally, I would like to add that there is a
> risk
> >> of inconsistent final states without performing the resolution. This is
> a
> >> major concern for me as most of the data I have dealt with is produced
> by
> >> relational databases. We have seen a number of cases where a user in the
> >> Rails UI has modified the field (foreign key), realized they made a
> >> mistake, and then updated the field again with a new key. The events are
> >> propagated out as they are produced, and as such we have had real-world
> >> cases where these inconsistencies were propagated downstream as the
> final
> >> values due to the race conditions in the fanout of the data.
> >>
> >> This solution that I propose values correctness of the final result over
> >> other factors.
> >>
> >> We could always move this function over to using a KScatteredTable
> >> implementation in the future, and simply deprecate it this join API in
> >> time. I think I would like to hear more from some of the other major
> >> committers on which course of action they would think is best before any
> >> more coding is done.
> >>
> >> Thanks again
> >>
> >> Adam
> >>
> >>
> >> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <j...@confluent.io> wrote:
> >>
> >>> Hi Jan and Adam,
> >>>
> >>> Wow, thanks for doing that test, Adam. Those results are encouraging.
> >>>
> >>> Thanks for your performance experience as well, Jan. I agree that
> avoiding
> >>> unnecessary join outputs is especially important when the fan-out is so
> >>> high. I suppose this could also be built into the implementation we're
> >>> discussing, but it wouldn't have to be specified in the KIP (since
> it's an
> >>> API-transparent optimization).
> >>>
> >>> As far as whether or not to re-repartition the data, I didn't bring it
> up
> >>> because it sounded like the two of you agreed to leave the KIP as-is,
> >>> despite the disagreement.
> >>>
> >>> If you want my opinion, I feel like both approaches are reasonable.
> >>> It sounds like Jan values more the potential for developers to optimize
> >>> their topologies to re-use the intermediate nodes, whereas Adam places
> >>> more
> >>> value on having a single operator that people can use without extra
> steps
> >>> at the end.
> >>>
> >>> Personally, although I do find it exceptionally annoying when a
> framework
> >>> gets in my way when I'm trying to optimize something, it seems better
> to
> >>> go
> >>> for a single operation.
> >>> * Encapsulating the internal transitions gives us significant latitude
> in
> >>> the implementation (for example, joining only at the end, not in the
> >>> middle
> >>> to avoid extra data copying and out-of-order resolution; how we
> represent
> >>> the first repartition keys (combined keys vs. value vectors), etc.).
> If we
> >>> publish something like a KScatteredTable with the right-partitioned
> joined
> >>> data, then the API pretty much locks in the implementation as well.
> >>> * The API seems simpler to understand and use. I do mean "seems"; if
> >>> anyone
> >>> wants to make the case that KScatteredTable is actually simpler, I
> think
> >>> hypothetical usage code would help. From a relational algebra
> perspective,
> >>> it seems like KTable.join(KTable) should produce a new KTable in all
> >>> cases.
> >>> * That said, there might still be room in the API for a different
> >>> operation
> >>> like what Jan has proposed to scatter a KTable, and then do things like
> >>> join, re-group, etc from there... I'm not sure; I haven't thought
> through
> >>> all the consequences yet.
> >>>
> >>> This is all just my opinion after thinking over the discussion so
> far...
> >>> -John
> >>>
> >>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> adam.bellem...@gmail.com>
> >>> wrote:
> >>>
> >>>> Updated the PR to take into account John's feedback.
> >>>>
> >>>> I did some preliminary testing for the performance of the prefixScan.
> I
> >>>> have attached the file, but I will also include the text in the body
> >>> here
> >>>> for archival purposes (I am not sure what happens to attached files).
> I
> >>>> also updated the PR and the KIP accordingly.
> >>>>
> >>>> Summary: It scales exceptionally well for scanning large values of
> >>>> records. As Jan mentioned previously, the real issue would be more
> >>> around
> >>>> processing the resulting records after obtaining them. For instance,
> it
> >>>> takes approximately ~80-120 mS to flush the buffer and a further
> >>> ~35-85mS
> >>>> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating
> >>>> through the records just to generate a simple count takes ~ 40 times
> >>> longer
> >>>> than the flush + scan combined.
> >>>>
> >>>>
> >>>
> ============================================================================================
> >>>> Setup:
> >>>>
> >>>>
> >>>
> ============================================================================================
> >>>> Java 9 with default settings aside from a 512 MB heap (Xmx512m,
> Xms512m)
> >>>> CPU: i7 2.2 Ghz.
> >>>>
> >>>> Note: I am using a slightly-modified, directly-accessible Kafka
> Streams
> >>>> RocksDB
> >>>> implementation (RocksDB.java, basically just avoiding the
> >>>> ProcessorContext).
> >>>> There are no modifications to the default RocksDB values provided in
> the
> >>>> 2.1/trunk release.
> >>>>
> >>>>
> >>>> keysize = 128 bytes
> >>>> valsize = 512 bytes
> >>>>
> >>>> Step 1:
> >>>> Write X positive matching events: (key = prefix + left-padded
> >>>> auto-incrementing integer)
> >>>> Step 2:
> >>>> Write 10X negative matching events (key = left-padded
> auto-incrementing
> >>>> integer)
> >>>> Step 3:
> >>>> Perform flush
> >>>> Step 4:
> >>>> Perform prefixScan
> >>>> Step 5:
> >>>> Iterate through return Iterator and validate the count of expected
> >>> events.
> >>>>
> >>>>
> >>>>
> >>>
> ============================================================================================
> >>>> Results:
> >>>>
> >>>>
> >>>
> ============================================================================================
> >>>> X = 1k (11k events total)
> >>>> Flush Time = 39 mS
> >>>> Scan Time = 7 mS
> >>>> 6.9 MB disk
> >>>>
> >>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>> X = 10k (110k events total)
> >>>> Flush Time = 45 mS
> >>>> Scan Time = 8 mS
> >>>> 127 MB
> >>>>
> >>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>> X = 100k (1.1M events total)
> >>>> Test1:
> >>>> Flush Time = 60 mS
> >>>> Scan Time = 12 mS
> >>>> 678 MB
> >>>>
> >>>> Test2:
> >>>> Flush Time = 45 mS
> >>>> Scan Time = 7 mS
> >>>> 576 MB
> >>>>
> >>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>> X = 1MB (11M events total)
> >>>> Test1:
> >>>> Flush Time = 52 mS
> >>>> Scan Time = 19 mS
> >>>> 7.2 GB
> >>>>
> >>>> Test2:
> >>>> Flush Time = 84 mS
> >>>> Scan Time = 34 mS
> >>>> 9.1 GB
> >>>>
> >>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>> X = 2.5M (27.5M events total)
> >>>> Test1:
> >>>> Flush Time = 82 mS
> >>>> Scan Time = 63 mS
> >>>> 17GB - 276 sst files
> >>>>
> >>>> Test2:
> >>>> Flush Time = 116 mS
> >>>> Scan Time = 35 mS
> >>>> 23GB - 361 sst files
> >>>>
> >>>> Test3:
> >>>> Flush Time = 103 mS
> >>>> Scan Time = 82 mS
> >>>> 19 GB - 300 sst files
> >>>>
> >>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>
> >>>> I had to limit my testing on my laptop to X = 2.5M events. I tried to
> go
> >>>> to X = 10M (110M events) but RocksDB was going into the 100GB+ range
> >>> and my
> >>>> laptop ran out of disk. More extensive testing could be done but I
> >>> suspect
> >>>> that it would be in line with what we're seeing in the results above.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> At this point in time, I think the only major discussion point is
> really
> >>>> around what Jan and I have disagreed on: repartitioning back +
> resolving
> >>>> potential out of order issues or leaving that up to the client to
> >>> handle.
> >>>>
> >>>>
> >>>> Thanks folks,
> >>>>
> >>>> Adam
> >>>>
> >>>>
> >>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com
> >
> >>>> wrote:
> >>>>
> >>>>>
> >>>>>
> >>>>> On 29.11.2018 15:14, John Roesler wrote:
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Sorry that this discussion petered out... I think the 2.1 release
> >>>>> caused an
> >>>>>> extended distraction that pushed it off everyone's radar (which was
> >>>>>> precisely Adam's concern). Personally, I've also had some extend
> >>>>>> distractions of my own that kept (and continue to keep) me
> >>> preoccupied.
> >>>>>>
> >>>>>> However, calling for a vote did wake me up, so I guess Jan was on
> the
> >>>>> right
> >>>>>> track!
> >>>>>>
> >>>>>> I've gone back and reviewed the whole KIP document and the prior
> >>>>>> discussion, and I'd like to offer a few thoughts:
> >>>>>>
> >>>>>> API Thoughts:
> >>>>>>
> >>>>>> 1. If I read the KIP right, you are proposing a many-to-one join.
> >>> Could
> >>>>> we
> >>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip the design
> >>>>> around
> >>>>>> and make it a oneToManyJoin?
> >>>>>>
> >>>>>> The proposed name "joinOnForeignKey" disguises the join type, and it
> >>>>> seems
> >>>>>> like it might trick some people into using it for a one-to-one join.
> >>>>> This
> >>>>>> would work, of course, but it would be super inefficient compared to
> >>> a
> >>>>>> simple rekey-and-join.
> >>>>>>
> >>>>>> 2. I might have missed it, but I don't think it's specified whether
> >>>>> it's an
> >>>>>> inner, outer, or left join. I'm guessing an outer join, as
> >>> (neglecting
> >>>>> IQ),
> >>>>>> the rest can be achieved by filtering or by handling it in the
> >>>>> ValueJoiner.
> >>>>>>
> >>>>>> 3. The arg list to joinOnForeignKey doesn't look quite right.
> >>>>>> 3a. Regarding Serialized: There are a few different paradigms in
> >>> play in
> >>>>>> the Streams API, so it's confusing, but instead of three Serialized
> >>>>> args, I
> >>>>>> think it would be better to have one that allows (optionally)
> setting
> >>>>> the 4
> >>>>>> incoming serdes. The result serde is defined by the Materialized.
> The
> >>>>>> incoming serdes can be optional because they might already be
> >>> available
> >>>>> on
> >>>>>> the source KTables, or the default serdes from the config might be
> >>>>>> applicable.
> >>>>>>
> >>>>>> 3b. Is the StreamPartitioner necessary? The other joins don't allow
> >>>>> setting
> >>>>>> one, and it seems like it might actually be harmful, since the rekey
> >>>>>> operation needs to produce results that are co-partitioned with the
> >>>>> "other"
> >>>>>> KTable.
> >>>>>>
> >>>>>> 4. I'm fine with the "reserved word" header, but I didn't actually
> >>>>> follow
> >>>>>> what Matthias meant about namespacing requiring "deserializing" the
> >>>>> record
> >>>>>> header. The headers are already Strings, so I don't think that
> >>>>>> deserialization is required. If we applied the namespace at source
> >>> nodes
> >>>>>> and stripped it at sink nodes, this would be practically no
> overhead.
> >>>>> The
> >>>>>> advantage of the namespace idea is that no public API change wrt
> >>> headers
> >>>>>> needs to happen, and no restrictions need to be placed on users'
> >>>>> headers.
> >>>>>>
> >>>>>> (Although I'm wondering if we can get away without the header at
> >>> all...
> >>>>>> stay tuned)
> >>>>>>
> >>>>>> 5. I also didn't follow the discussion about the HWM table growing
> >>>>> without
> >>>>>> bound. As I read it, the HWM table is effectively implementing OCC
> to
> >>>>>> resolve the problem you noted with disordering when the rekey is
> >>>>>> reversed... particularly notable when the FK changes. As such, it
> >>> only
> >>>>>> needs to track the most recent "version" (the offset in the source
> >>>>>> partition) of each key. Therefore, it should have the same number of
> >>>>> keys
> >>>>>> as the source table at all times.
> >>>>>>
> >>>>>> I see that you are aware of KIP-258, which I think might be relevant
> >>> in
> >>>>> a
> >>>>>> couple of ways. One: it's just about storing the timestamp in the
> >>> state
> >>>>>> store, but the ultimate idea is to effectively use the timestamp as
> >>> an
> >>>>> OCC
> >>>>>> "version" to drop disordered updates. You wouldn't want to use the
> >>>>>> timestamp for this operation, but if you were to use a similar
> >>>>> mechanism to
> >>>>>> store the source offset in the store alongside the re-keyed values,
> >>> then
> >>>>>> you could avoid a separate table.
> >>>>>>
> >>>>>> 6. You and Jan have been thinking about this for a long time, so
> I've
> >>>>>> probably missed something here, but I'm wondering if we can avoid
> the
> >>>>> HWM
> >>>>>> tracking at all and resolve out-of-order during a final join
> >>> instead...
> >>>>>>
> >>>>>> Let's say we're joining a left table (Integer K: Letter FK, (other
> >>>>> data))
> >>>>>> to a right table (Letter K: (some data)).
> >>>>>>
> >>>>>> Left table:
> >>>>>> 1: (A, xyz)
> >>>>>> 2: (B, asd)
> >>>>>>
> >>>>>> Right table:
> >>>>>> A: EntityA
> >>>>>> B: EntityB
> >>>>>>
> >>>>>> We could do a rekey as you proposed with a combined key, but not
> >>>>>> propagating the value at all..
> >>>>>> Rekey table:
> >>>>>> A-1: (dummy value)
> >>>>>> B-2: (dummy value)
> >>>>>>
> >>>>>> Which we then join with the right table to produce:
> >>>>>> A-1: EntityA
> >>>>>> B-2: EntityB
> >>>>>>
> >>>>>> Which gets rekeyed back:
> >>>>>> 1: A, EntityA
> >>>>>> 2: B, EntityB
> >>>>>>
> >>>>>> And finally we do the actual join:
> >>>>>> Result table:
> >>>>>> 1: ((A, xyz), EntityA)
> >>>>>> 2: ((B, asd), EntityB)
> >>>>>>
> >>>>>> The thing is that in that last join, we have the opportunity to
> >>> compare
> >>>>> the
> >>>>>> current FK in the left table with the incoming PK of the right
> >>> table. If
> >>>>>> they don't match, we just drop the event, since it must be outdated.
> >>>>>>
> >>>>>
> >>>>>> In your KIP, you gave an example in which (1: A, xyz) gets updated
> to
> >>>>> (1:
> >>>>>> B, xyz), ultimately yielding a conundrum about whether the final
> >>> state
> >>>>>> should be (1: null) or (1: joined-on-B). With the algorithm above,
> >>> you
> >>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B,
> >>>>>> EntityB)). It seems like this does give you enough information to
> >>> make
> >>>>> the
> >>>>>> right choice, regardless of disordering.
> >>>>>
> >>>>> Will check Adams patch, but this should work. As mentioned often I am
> >>>>> not convinced on partitioning back for the user automatically. I
> think
> >>>>> this is the real performance eater ;)
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>> 7. Last thought... I'm a little concerned about the performance of
> >>> the
> >>>>>> range scans when records change in the right table. You've said that
> >>>>> you've
> >>>>>> been using the algorithm you presented in production for a while.
> Can
> >>>>> you
> >>>>>> give us a sense of the performance characteristics you've observed?
> >>>>>>
> >>>>>
> >>>>> Make it work, make it fast, make it beautiful. The topmost thing here
> >>> is
> >>>>> / was correctness. In practice I do not measure the performance of
> the
> >>>>> range scan. Usual cases I run this with is emitting 500k - 1kk rows
> >>>>> on a left hand side change. The range scan is just the work you gotta
> >>>>> do, also when you pack your data into different formats, usually the
> >>>>> rocks performance is very tight to the size of the data and we can't
> >>>>> really change that. It is more important for users to prevent useless
> >>>>> updates to begin with. My left hand side is guarded to drop changes
> >>> that
> >>>>> are not going to change my join output.
> >>>>>
> >>>>> usually it's:
> >>>>>
> >>>>> drop unused fields and then don't forward if old.equals(new)
> >>>>>
> >>>>> regarding to the performance of creating an iterator for smaller
> >>>>> fanouts, users can still just do a group by first then anyways.
> >>>>>
> >>>>>
> >>>>>
> >>>>>> I could only think of one alternative, but I'm not sure if it's
> >>> better
> >>>>> or
> >>>>>> worse... If the first re-key only needs to preserve the original
> key,
> >>>>> as I
> >>>>>> proposed in #6, then we could store a vector of keys in the value:
> >>>>>>
> >>>>>> Left table:
> >>>>>> 1: A,...
> >>>>>> 2: B,...
> >>>>>> 3: A,...
> >>>>>>
> >>>>>> Gets re-keyed:
> >>>>>> A: [1, 3]
> >>>>>> B: [2]
> >>>>>>
> >>>>>> Then, the rhs part of the join would only need a regular single-key
> >>>>> lookup.
> >>>>>> Of course we have to deal with the problem of large values, as
> >>> there's
> >>>>> no
> >>>>>> bound on the number of lhs records that can reference rhs records.
> >>>>> Offhand,
> >>>>>> I'd say we could page the values, so when one row is past the
> >>>>> threshold, we
> >>>>>> append the key for the next page. Then in most cases, it would be a
> >>>>> single
> >>>>>> key lookup, but for large fan-out updates, it would be one per (max
> >>>>> value
> >>>>>> size)/(avg lhs key size).
> >>>>>>
> >>>>>> This seems more complex, though... Plus, I think there's some extra
> >>>>>> tracking we'd need to do to know when to emit a retraction. For
> >>> example,
> >>>>>> when record 1 is deleted, the re-key table would just have (A: [3]).
> >>>>> Some
> >>>>>> kind of tombstone is needed so that the join result for 1 can also
> be
> >>>>>> retracted.
> >>>>>>
> >>>>>> That's all!
> >>>>>>
> >>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry
> the
> >>>>>> discussion has been slow.
> >>>>>> -John
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> >>> jan.filip...@trivago.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Id say you can just call the vote.
> >>>>>>>
> >>>>>>> that happens all the time, and if something comes up, it just goes
> >>> back
> >>>>>>> to discuss.
> >>>>>>>
> >>>>>>> would not expect to much attention with another another email in
> >>> this
> >>>>>>> thread.
> >>>>>>>
> >>>>>>> best Jan
> >>>>>>>
> >>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> >>>>>>>> Hello Contributors
> >>>>>>>>
> >>>>>>>> I know that 2.1 is about to be released, but I do need to bump
> >>> this to
> >>>>>>> keep
> >>>>>>>> visibility up. I am still intending to push this through once
> >>>>> contributor
> >>>>>>>> feedback is given.
> >>>>>>>>
> >>>>>>>> Main points that need addressing:
> >>>>>>>> 1) Any way (or benefit) in structuring the current singular graph
> >>> node
> >>>>>>> into
> >>>>>>>> multiple nodes? It has a whopping 25 parameters right now. I am a
> >>> bit
> >>>>>>> fuzzy
> >>>>>>>> on how the optimizations are supposed to work, so I would
> >>> appreciate
> >>>>> any
> >>>>>>>> help on this aspect.
> >>>>>>>>
> >>>>>>>> 2) Overall strategy for joining + resolving. This thread has much
> >>>>>>> discourse
> >>>>>>>> between Jan and I between the current highwater mark proposal and
> a
> >>>>>>> groupBy
> >>>>>>>> + reduce proposal. I am of the opinion that we need to strictly
> >>> handle
> >>>>>>> any
> >>>>>>>> chance of out-of-order data and leave none of it up to the
> >>> consumer.
> >>>>> Any
> >>>>>>>> comments or suggestions here would also help.
> >>>>>>>>
> >>>>>>>> 3) Anything else that you see that would prevent this from moving
> >>> to a
> >>>>>>> vote?
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>>
> >>>>>>>> Adam
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> >>>>>>> adam.bellem...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Jan
> >>>>>>>>>
> >>>>>>>>> With the Stores.windowStoreBuilder and
> >>> Stores.persistentWindowStore,
> >>>>> you
> >>>>>>>>> actually only need to specify the amount of segments you want and
> >>> how
> >>>>>>> large
> >>>>>>>>> they are. To the best of my understanding, what happens is that
> >>> the
> >>>>>>>>> segments are automatically rolled over as new data with new
> >>>>> timestamps
> >>>>>>> are
> >>>>>>>>> created. We use this exact functionality in some of the work done
> >>>>>>>>> internally at my company. For reference, this is the hopping
> >>> windowed
> >>>>>>> store.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> >>>>>>>>>
> >>>>>>>>> In the code that I have provided, there are going to be two 24h
> >>>>>>> segments.
> >>>>>>>>> When a record is put into the windowStore, it will be inserted at
> >>>>> time
> >>>>>>> T in
> >>>>>>>>> both segments. The two segments will always overlap by 12h. As
> >>> time
> >>>>>>> goes on
> >>>>>>>>> and new records are added (say at time T+12h+), the oldest
> segment
> >>>>> will
> >>>>>>> be
> >>>>>>>>> automatically deleted and a new segment created. The records are
> >>> by
> >>>>>>> default
> >>>>>>>>> inserted with the context.timestamp(), such that it is the record
> >>>>> time,
> >>>>>>> not
> >>>>>>>>> the clock time, which is used.
> >>>>>>>>>
> >>>>>>>>> To the best of my understanding, the timestamps are retained when
> >>>>>>>>> restoring from the changelog.
> >>>>>>>>>
> >>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a
> >>>>> segment-level,
> >>>>>>>>> instead of at an individual record level.
> >>>>>>>>>
> >>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> >>>>> jan.filip...@trivago.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Will that work? I expected it to blow up with ClassCastException
> >>> or
> >>>>>>>>>> similar.
> >>>>>>>>>>
> >>>>>>>>>> You either would have to specify the window you fetch/put or
> >>> iterate
> >>>>>>>>>> across all windows the key was found in right?
> >>>>>>>>>>
> >>>>>>>>>> I just hope the window-store doesn't check stream-time under the
> >>>>> hoods
> >>>>>>>>>> that would be a questionable interface.
> >>>>>>>>>>
> >>>>>>>>>> If it does: did you see my comment on checking all the windows
> >>>>> earlier?
> >>>>>>>>>> that would be needed to actually give reasonable time gurantees.
> >>>>>>>>>>
> >>>>>>>>>> Best
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> >>>>>>>>>>> Hi Jan
> >>>>>>>>>>>
> >>>>>>>>>>> Check for  " highwaterMat " in the PR. I only changed the state
> >>>>> store,
> >>>>>>>>>> not
> >>>>>>>>>>> the ProcessorSupplier.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Adam
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> >>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> @Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the information. This is indeed something that
> >>> will be
> >>>>>>>>>>>>> extremely
> >>>>>>>>>>>>> useful for this KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>> Thanks for your explanations. That being said, I will not be
> >>>>> moving
> >>>>>>>>>> ahead
> >>>>>>>>>>>>> with an implementation using reshuffle/groupBy solution as
> you
> >>>>>>>>>> propose.
> >>>>>>>>>>>>> That being said, if you wish to implement it yourself off of
> >>> my
> >>>>>>>>>> current PR
> >>>>>>>>>>>>> and submit it as a competitive alternative, I would be more
> >>> than
> >>>>>>>>>> happy to
> >>>>>>>>>>>>> help vet that as an alternate solution. As it stands right
> >>> now,
> >>>>> I do
> >>>>>>>>>> not
> >>>>>>>>>>>>> really have more time to invest into alternatives without
> >>> there
> >>>>>>> being
> >>>>>>>>>> a
> >>>>>>>>>>>>> strong indication from the binding voters which they would
> >>>>> prefer.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>> Hey, total no worries. I think I personally gave up on the
> >>> streams
> >>>>>>> DSL
> >>>>>>>>>> for
> >>>>>>>>>>>> some time already, otherwise I would have pulled this KIP
> >>> through
> >>>>>>>>>> already.
> >>>>>>>>>>>> I am currently reimplementing my own DSL based on PAPI.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I will look at finishing up my PR with the windowed state
> >>> store
> >>>>> in
> >>>>>>> the
> >>>>>>>>>>>>> next
> >>>>>>>>>>>>> week or so, exercising it via tests, and then I will come
> back
> >>>>> for
> >>>>>>>>>> final
> >>>>>>>>>>>>> discussions. In the meantime, I hope that any of the binding
> >>>>> voters
> >>>>>>>>>> could
> >>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it
> >>> according
> >>>>> to
> >>>>>>> the
> >>>>>>>>>>>>> latest plan:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >>>>>>>>>>>>> Support+non-key+joining+in+KTable
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have also updated the KIP PR to use a windowed store. This
> >>>>> could
> >>>>>>> be
> >>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are
> >>> completed.
> >>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated
> >>> in
> >>>>> the
> >>>>>>>>>> PR?
> >>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing something?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
> >>>>> wangg...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong
> link,
> >>>>> as it
> >>>>>>>>>> is
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258
> >>> we do
> >>>>>>>>>> want to
> >>>>>>>>>>>>>> have "handling out-of-order data for source KTable" such
> that
> >>>>>>>>>> instead of
> >>>>>>>>>>>>>> blindly apply the updates to the materialized store, i.e.
> >>>>> following
> >>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>> ordering, we will reject updates that are older than the
> >>> current
> >>>>>>>>>> key's
> >>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hello Adam,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e.
> >>> the
> >>>>>>> high
> >>>>>>>>>>>>>>> watermark store, now altered to be replaced with a window
> >>>>> store),
> >>>>>>> I
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>> another current on-going KIP may actually help:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This is for adding the timestamp into a key-value store
> >>> (i.e.
> >>>>> only
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as
> >>> described
> >>>>> in
> >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that
> >>> we
> >>>>> can
> >>>>>>>>>> then
> >>>>>>>>>>>>>>> "reject" updates from the source topics if its timestamp is
> >>>>>>> smaller
> >>>>>>>>>> than
> >>>>>>>>>>>>>>> the current key's latest update timestamp. I think it is
> >>> very
> >>>>>>>>>> similar to
> >>>>>>>>>>>>>>> what you have in mind for high watermark based filtering,
> >>> while
> >>>>>>> you
> >>>>>>>>>> only
> >>>>>>>>>>>>>>> need to make sure that the timestamps of the joining
> records
> >>>>> are
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> correctly
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> inherited though the whole topology to the final stage.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence
> >>>>> non-windowed
> >>>>>>>>>> KTables
> >>>>>>>>>>>>>>> only, but for windowed KTables we do not really have a good
> >>>>>>> support
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> their joins anyways (
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>> think we can just consider non-windowed KTable-KTable
> >>> non-key
> >>>>>>> joins
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
> >>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Current highwater mark implementation would grow
> endlessly
> >>>>> based
> >>>>>>>>>> on
> >>>>>>>>>>>>>>>>> primary key of original event. It is a pair of (<this
> >>> table
> >>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> key>,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> <highest offset seen for that key>). This is used to
> >>>>> differentiate
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal would be
> >>> to
> >>>>>>>>>> replace
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> with a Windowed state store of Duration N. This would allow
> >>> the
> >>>>>>> same
> >>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This should
> >>> allow
> >>>>> for
> >>>>>>>>>> all
> >>>>>>>>>>>>>>>>> late-arriving events to be processed, and should be
> >>>>> customizable
> >>>>>>>>>> by
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10
> >>>>> minutes
> >>>>>>> of
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> window,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> or perhaps 7 days...).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the trick
> here.
> >>>>> Even
> >>>>>>>>>> if I
> >>>>>>>>>>>>>>>> would still like to see the automatic repartitioning
> >>> optional
> >>>>>>>>>> since I
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a little bit
> >>>>>>>>>> sceptical
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> how to determine the window. So esentially one could run
> >>> into
> >>>>>>>>>> problems
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the rapid change happens near a window border. I will check
> >>> you
> >>>>>>>>>>>>>>>> implementation in detail, if its problematic, we could
> >>> still
> >>>>>>> check
> >>>>>>>>>>>>>>>> _all_
> >>>>>>>>>>>>>>>> windows on read with not to bad performance impact I
> guess.
> >>>>> Will
> >>>>>>>>>> let
> >>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>> know if the implementation would be correct as is. I
> >>> wouldn't
> >>>>> not
> >>>>>>>>>> like
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A)  <
> >>>>>>> timestamp(B).
> >>>>>>>>>> I
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> we can't expect that.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks for the
> >>>>>>>>>> diagram, it
> >>>>>>>>>>>>>>>>> did really help. You are correct that I do not have the
> >>>>> original
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> key available, and I can see that if it was available then
> >>> you
> >>>>>>>>>> would be
> >>>>>>>>>>>>>>>>> able to add and remove events from the Map. That being
> >>> said,
> >>>>> I
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> encourage
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> you to finish your diagrams / charts just for clarity for
> >>>>> everyone
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> else.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But
> >>> I
> >>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the original
> primary
> >>>>> key,
> >>>>>>> We
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and
> basically
> >>>>> not
> >>>>>>>>>> using
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that in
> >>> original
> >>>>> DSL
> >>>>>>>>>> its
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> there and just assumed it. total brain mess up on my end.
> >>> Will
> >>>>>>>>>> finish
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> chart as soon as i get a quite evening this week.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> My follow up question for you is, won't the Map stay
> inside
> >>>>> the
> >>>>>>>>>> State
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have
> >>> propagated?
> >>>>>>> Isn't
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> effectively the same as a highwater mark state store?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna
> >>>>> return
> >>>>>>>>>> `null`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the key is removed from the keyspace. But there is going to
> >>> be
> >>>>> a
> >>>>>>>>>> store
> >>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store directly
> >>> for
> >>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a regular
> >>> store,
> >>>>>>>>>> satisfying
> >>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. The
> >>> Windowed
> >>>>>>>>>> store is
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> keeping the values, so for the next statefull operation we
> >>>>> would
> >>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the window
> >>>>> store
> >>>>>>>>>> also
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the values then.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group by
> >>> before
> >>>>>>>>>>>>>>>> repartitioning to the original primary key i think it
> would
> >>>>> help
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> big time in building efficient apps. Given the original
> >>> primary
> >>>>>>> key
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> issue I
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> understand that we do not have a solid foundation to build
> >>> on.
> >>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very
> >>>>> unfortunate. I
> >>>>>>>>>> could
> >>>>>>>>>>>>>>>> understand the decision goes like that. I do not think its
> >>> a
> >>>>> good
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> decision.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
> >>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
> >>>>> dumbreprajakta...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          please remove me from this group
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
> >>>>>>>>>>>>>>>>>          <jan.filip...@trivago.com <mailto:
> >>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > Hi Adam,
> >>>>>>>>>>>>>>>>>          >
> >>>>>>>>>>>>>>>>>          > give me some time, will make such a chart.
> last
> >>>>> time i
> >>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>>          get along
> >>>>>>>>>>>>>>>>>          > well with giphy and ruined all your charts.
> >>>>>>>>>>>>>>>>>          > Hopefully i can get it done today
> >>>>>>>>>>>>>>>>>          >
> >>>>>>>>>>>>>>>>>          > On 08.09.2018 16:00, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>          > > Hi Jan
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > > I have included a diagram of what I
> attempted
> >>> on
> >>>>> the
> >>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          >
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> >>>>>>>>>>>>>>>>>
> >>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> >>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> >>>>>>>>>>>>>>>>>          <
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> >>>>>>>>>>>>>>>>>
> >>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> >>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > > I attempted this back at the start of my own
> >>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>          this
> >>>>>>>>>>>>>>>>>          > > solution, and since I could not get it to
> >>> work I
> >>>>> have
> >>>>>>>>>> since
> >>>>>>>>>>>>>>>>>          discarded the
> >>>>>>>>>>>>>>>>>          > > code. At this point in time, if you wish to
> >>>>> continue
> >>>>>>>>>> pursuing
> >>>>>>>>>>>>>>>>>          for your
> >>>>>>>>>>>>>>>>>          > > groupBy solution, I ask that you please
> >>> create a
> >>>>>>>>>> diagram on
> >>>>>>>>>>>>>>>>>          the KIP
> >>>>>>>>>>>>>>>>>          > > carefully explaining your solution. Please
> >>> feel
> >>>>> free
> >>>>>>> to
> >>>>>>>>>> use
> >>>>>>>>>>>>>>>>>          the image I
> >>>>>>>>>>>>>>>>>          > > just posted as a starting point. I am having
> >>>>> trouble
> >>>>>>>>>>>>>>>>>          understanding your
> >>>>>>>>>>>>>>>>>          > > explanations but I think that a carefully
> >>>>> constructed
> >>>>>>>>>> diagram
> >>>>>>>>>>>>>>>>>          will clear
> >>>>>>>>>>>>>>>>>          > up
> >>>>>>>>>>>>>>>>>          > > any misunderstandings. Alternately, please
> >>> post a
> >>>>>>>>>>>>>>>>>          comprehensive PR with
> >>>>>>>>>>>>>>>>>          > > your solution. I can only guess at what you
> >>>>> mean, and
> >>>>>>>>>> since I
> >>>>>>>>>>>>>>>>>          value my
> >>>>>>>>>>>>>>>>>          > own
> >>>>>>>>>>>>>>>>>          > > time as much as you value yours, I believe
> it
> >>> is
> >>>>> your
> >>>>>>>>>>>>>>>>>          responsibility to
> >>>>>>>>>>>>>>>>>          > > provide an implementation instead of me
> >>> trying to
> >>>>>>> guess.
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > > Adam
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
> >>>>>>>>>>>>>>>>>          <jan.filip...@trivago.com <mailto:
> >>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > > wrote:
> >>>>>>>>>>>>>>>>>          > >
> >>>>>>>>>>>>>>>>>          > >> Hi James,
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >> nice to see you beeing interested. kafka
> >>>>> streams at
> >>>>>>>>>> this
> >>>>>>>>>>>>>>>>>          point supports
> >>>>>>>>>>>>>>>>>          > >> all sorts of joins as long as both streams
> >>> have
> >>>>> the
> >>>>>>>>>> same
> >>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>          > >> Adam is currently implementing a join
> where a
> >>>>> KTable
> >>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>          KTable can
> >>>>>>>>>>>>>>>>>          > have
> >>>>>>>>>>>>>>>>>          > >> a one to many relation ship (1:n). We
> exploit
> >>>>> that
> >>>>>>>>>> rocksdb
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > >> datastore that keeps data sorted (At least
> >>>>> exposes an
> >>>>>>>>>> API to
> >>>>>>>>>>>>>>>>>          access the
> >>>>>>>>>>>>>>>>>          > >> stored data in a sorted fashion).
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >> I think the technical caveats are well
> >>>>> understood
> >>>>>>> now
> >>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > basically
> >>>>>>>>>>>>>>>>>          > >> down to philosophy and API Design ( when
> Adam
> >>>>> sees
> >>>>>>> my
> >>>>>>>>>> newest
> >>>>>>>>>>>>>>>>>          message).
> >>>>>>>>>>>>>>>>>          > >> I have a lengthy track record of loosing
> >>> those
> >>>>> kinda
> >>>>>>>>>>>>>>>>>          arguments within
> >>>>>>>>>>>>>>>>>          > the
> >>>>>>>>>>>>>>>>>          > >> streams community and I have no clue why.
> So
> >>> I
> >>>>>>>>>> literally
> >>>>>>>>>>>>>>>>>          can't wait for
> >>>>>>>>>>>>>>>>>          > you
> >>>>>>>>>>>>>>>>>          > >> to churn through this thread and give you
> >>>>> opinion on
> >>>>>>>>>> how we
> >>>>>>>>>>>>>>>>>          should
> >>>>>>>>>>>>>>>>>          > design
> >>>>>>>>>>>>>>>>>          > >> the return type of the oneToManyJoin and
> how
> >>>>> many
> >>>>>>>>>> power we
> >>>>>>>>>>>>>>>>>          want to give
> >>>>>>>>>>>>>>>>>          > to
> >>>>>>>>>>>>>>>>>          > >> the user vs "simplicity" (where simplicity
> >>> isn't
> >>>>>>>>>> really that
> >>>>>>>>>>>>>>>>>          as users
> >>>>>>>>>>>>>>>>>          > still
> >>>>>>>>>>>>>>>>>          > >> need to understand it I argue)
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >> waiting for you to join in on the
> discussion
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >> Best Jan
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >> On 07.09.2018 15:49, James Kwan wrote:
> >>>>>>>>>>>>>>>>>          > >>
> >>>>>>>>>>>>>>>>>          > >>> I am new to this group and I found this
> >>> subject
> >>>>>>>>>>>>>>>>>          interesting.  Sounds
> >>>>>>>>>>>>>>>>>          > like
> >>>>>>>>>>>>>>>>>          > >>> you guys want to implement a join table of
> >>> two
> >>>>>>>>>> streams? Is
> >>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>          > somewhere
> >>>>>>>>>>>>>>>>>          > >>> I can see the original requirement or
> >>> proposal?
> >>>>>>>>>>>>>>>>>          > >>>
> >>>>>>>>>>>>>>>>>          > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
> >>>>>>>>>>>>>>>>>          <jan.filip...@trivago.com <mailto:
> >>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>> On 05.09.2018 22:17, Adam Bellemare
> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>> I'm currently testing using a Windowed
> >>> Store
> >>>>> to
> >>>>>>>>>> store the
> >>>>>>>>>>>>>>>>>          highwater
> >>>>>>>>>>>>>>>>>          > >>>>> mark.
> >>>>>>>>>>>>>>>>>          > >>>>> By all indications this should work
> fine,
> >>>>> with
> >>>>>>> the
> >>>>>>>>>> caveat
> >>>>>>>>>>>>>>>>>          being that
> >>>>>>>>>>>>>>>>>          > it
> >>>>>>>>>>>>>>>>>          > >>>>> can
> >>>>>>>>>>>>>>>>>          > >>>>> only resolve out-of-order arrival for up
> >>> to
> >>>>> the
> >>>>>>>>>> size of
> >>>>>>>>>>>>>>>>>          the window
> >>>>>>>>>>>>>>>>>          > (ie:
> >>>>>>>>>>>>>>>>>          > >>>>> 24h, 72h, etc). This would remove the
> >>>>> possibility
> >>>>>>>>>> of it
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > unbounded
> >>>>>>>>>>>>>>>>>          > >>>>> in
> >>>>>>>>>>>>>>>>>          > >>>>> size.
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>> With regards to Jan's suggestion, I
> >>> believe
> >>>>> this
> >>>>>>> is
> >>>>>>>>>> where
> >>>>>>>>>>>>>>>>>          we will
> >>>>>>>>>>>>>>>>>          > have
> >>>>>>>>>>>>>>>>>          > >>>>> to
> >>>>>>>>>>>>>>>>>          > >>>>> remain in disagreement. While I do not
> >>>>> disagree
> >>>>>>>>>> with your
> >>>>>>>>>>>>>>>>>          statement
> >>>>>>>>>>>>>>>>>          > >>>>> about
> >>>>>>>>>>>>>>>>>          > >>>>> there likely to be additional joins done
> >>> in a
> >>>>>>>>>> real-world
> >>>>>>>>>>>>>>>>>          workflow, I
> >>>>>>>>>>>>>>>>>          > do
> >>>>>>>>>>>>>>>>>          > >>>>> not
> >>>>>>>>>>>>>>>>>          > >>>>> see how you can conclusively deal with
> >>>>>>> out-of-order
> >>>>>>>>>>>>>>>>> arrival
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>          > >>>>> foreign-key
> >>>>>>>>>>>>>>>>>          > >>>>> changes and subsequent joins. I have
> >>>>> attempted
> >>>>>>> what
> >>>>>>>>>> I
> >>>>>>>>>>>>>>>>>          think you have
> >>>>>>>>>>>>>>>>>          > >>>>> proposed (without a high-water, using
> >>>>> groupBy and
> >>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>>>          and found
> >>>>>>>>>>>>>>>>>          > >>>>> that if
> >>>>>>>>>>>>>>>>>          > >>>>> the foreign key changes too quickly, or
> >>> the
> >>>>> load
> >>>>>>> on
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>          stream thread
> >>>>>>>>>>>>>>>>>          > is
> >>>>>>>>>>>>>>>>>          > >>>>> too
> >>>>>>>>>>>>>>>>>          > >>>>> high, the joined messages will arrive
> >>>>>>> out-of-order
> >>>>>>>>>> and be
> >>>>>>>>>>>>>>>>>          incorrectly
> >>>>>>>>>>>>>>>>>          > >>>>> propagated, such that an intermediate
> >>> event
> >>>>> is
> >>>>>>>>>>>>>>>>> represented
> >>>>>>>>>>>>>>>>>          as the
> >>>>>>>>>>>>>>>>>          > final
> >>>>>>>>>>>>>>>>>          > >>>>> event.
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>> Can you shed some light on your groupBy
> >>>>>>>>>> implementation.
> >>>>>>>>>>>>>>>>>          There must be
> >>>>>>>>>>>>>>>>>          > >>>> some sort of flaw in it.
> >>>>>>>>>>>>>>>>>          > >>>> I have a suspicion where it is, I would
> >>> just
> >>>>> like
> >>>>>>> to
> >>>>>>>>>>>>>>>>>          confirm. The idea
> >>>>>>>>>>>>>>>>>          > >>>> is bullet proof and it must be
> >>>>>>>>>>>>>>>>>          > >>>> an implementation mess up. I would like
> to
> >>>>> clarify
> >>>>>>>>>> before
> >>>>>>>>>>>>>>>>>          we draw a
> >>>>>>>>>>>>>>>>>          > >>>> conclusion.
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>    Repartitioning the scattered events
> >>> back to
> >>>>>>> their
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > >>>>> partitions is the only way I know how to
> >>>>>>> conclusively
> >>>>>>>>>> deal
> >>>>>>>>>>>>>>>>>          with
> >>>>>>>>>>>>>>>>>          > >>>>> out-of-order events in a given time
> frame,
> >>>>> and to
> >>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>          that the
> >>>>>>>>>>>>>>>>>          > data
> >>>>>>>>>>>>>>>>>          > >>>>> is
> >>>>>>>>>>>>>>>>>          > >>>>> eventually consistent with the input
> >>> events.
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>> If you have some code to share that
> >>>>> illustrates
> >>>>>>> your
> >>>>>>>>>>>>>>>>>          approach, I
> >>>>>>>>>>>>>>>>>          > would
> >>>>>>>>>>>>>>>>>          > >>>>> be
> >>>>>>>>>>>>>>>>>          > >>>>> very grateful as it would remove any
> >>>>>>>>>> misunderstandings
> >>>>>>>>>>>>>>>>>          that I may
> >>>>>>>>>>>>>>>>>          > have.
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>> ah okay you were looking for my code. I
> >>> don't
> >>>>> have
> >>>>>>>>>>>>>>>>>          something easily
> >>>>>>>>>>>>>>>>>          > >>>> readable here as its bloated with
> >>> OO-patterns.
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>> its anyhow trivial:
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>> @Override
> >>>>>>>>>>>>>>>>>          > >>>>      public T apply(K aggKey, V value, T
> >>>>>>> aggregate)
> >>>>>>>>>>>>>>>>>          > >>>>      {
> >>>>>>>>>>>>>>>>>          > >>>>          Map<U, V> currentStateAsMap =
> >>>>>>>>>> asMap(aggregate);
> >>>>>>>>>>>>>>>>> <<
> >>>>>>>>>>>>>>>>>          imaginary
> >>>>>>>>>>>>>>>>>          > >>>>          U toModifyKey =
> >>> mapper.apply(value);
> >>>>>>>>>>>>>>>>>          > >>>>              << this is the place where
> >>> people
> >>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>          gonna have
> >>>>>>>>>>>>>>>>>          > issues
> >>>>>>>>>>>>>>>>>          > >>>> and why you probably couldn't do it. we
> >>> would
> >>>>> need
> >>>>>>>>>> to find
> >>>>>>>>>>>>>>>>>          a solution
> >>>>>>>>>>>>>>>>>          > here.
> >>>>>>>>>>>>>>>>>          > >>>> I didn't realize that yet.
> >>>>>>>>>>>>>>>>>          > >>>>              << we propagate the field in
> >>> the
> >>>>>>>>>> joiner, so
> >>>>>>>>>>>>>>>>>          that we can
> >>>>>>>>>>>>>>>>>          > pick
> >>>>>>>>>>>>>>>>>          > >>>> it up in an aggregate. Probably you have
> >>> not
> >>>>>>> thought
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>          this in your
> >>>>>>>>>>>>>>>>>          > >>>> approach right?
> >>>>>>>>>>>>>>>>>          > >>>>              << I am very open to find a
> >>>>> generic
> >>>>>>>>>> solution
> >>>>>>>>>>>>>>>>>          here. In my
> >>>>>>>>>>>>>>>>>          > >>>> honest opinion this is broken in
> >>>>>>> KTableImpl.GroupBy
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>          looses
> >>>>>>>>>>>>>>>>>          > the keys
> >>>>>>>>>>>>>>>>>          > >>>> and only maintains the aggregate key.
> >>>>>>>>>>>>>>>>>          > >>>>              << I abstracted it away back
> >>>>> then way
> >>>>>>>>>> before
> >>>>>>>>>>>>>>>>> i
> >>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>          > thinking
> >>>>>>>>>>>>>>>>>          > >>>> of oneToMany join. That is why I didn't
> >>>>> realize
> >>>>>>> its
> >>>>>>>>>>>>>>>>>          significance here.
> >>>>>>>>>>>>>>>>>          > >>>>              << Opinions?
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>          for (V m : current)
> >>>>>>>>>>>>>>>>>          > >>>>          {
> >>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.put(mapper.apply(m),
> m);
> >>>>>>>>>>>>>>>>>          > >>>>          }
> >>>>>>>>>>>>>>>>>          > >>>>          if (isAdder)
> >>>>>>>>>>>>>>>>>          > >>>>          {
> >>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.put(toModifyKey,
> value);
> >>>>>>>>>>>>>>>>>          > >>>>          }
> >>>>>>>>>>>>>>>>>          > >>>>          else
> >>>>>>>>>>>>>>>>>          > >>>>          {
> >>>>>>>>>>>>>>>>>          > >>>> currentStateAsMap.remove(toModifyKey);
> >>>>>>>>>>>>>>>>>          > >>>> if(currentStateAsMap.isEmpty()){
> >>>>>>>>>>>>>>>>>          > >>>>                  return null;
> >>>>>>>>>>>>>>>>>          > >>>>              }
> >>>>>>>>>>>>>>>>>          > >>>>          }
> >>>>>>>>>>>>>>>>>          > >>>>          retrun
> >>>>> asAggregateType(currentStateAsMap)
> >>>>>>>>>>>>>>>>>          > >>>>      }
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>>
> >>>>>>>>>>>>>>>>>          > >>>> Thanks,
> >>>>>>>>>>>>>>>>>          > >>>>> Adam
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan
> >>> Filipiak
> >>>>> <
> >>>>>>>>>>>>>>>>>          > jan.filip...@trivago.com <mailto:
> >>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>>
> >>>>>>>>>>>>>>>>>          > >>>>> Thanks Adam for bringing Matthias to
> >>> speed!
> >>>>>>>>>>>>>>>>>          > >>>>>> about the differences. I think
> re-keying
> >>>>> back
> >>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>          optional at
> >>>>>>>>>>>>>>>>>          > >>>>>> best.
> >>>>>>>>>>>>>>>>>          > >>>>>> I would say we return a KScatteredTable
> >>> with
> >>>>>>>>>> reshuffle()
> >>>>>>>>>>>>>>>>>          returning
> >>>>>>>>>>>>>>>>>          > >>>>>> KTable<originalKey,Joined> to make the
> >>>>> backwards
> >>>>>>>>>>>>>>>>>          repartitioning
> >>>>>>>>>>>>>>>>>          > >>>>>> optional.
> >>>>>>>>>>>>>>>>>          > >>>>>> I am also in a big favour of doing the
> >>> out
> >>>>> of
> >>>>>>> order
> >>>>>>>>>>>>>>>>>          processing using
> >>>>>>>>>>>>>>>>>          > >>>>>> group
> >>>>>>>>>>>>>>>>>          > >>>>>> by instead high water mark tracking.
> >>>>>>>>>>>>>>>>>          > >>>>>> Just because unbounded growth is just
> >>> scary
> >>>>> + It
> >>>>>>>>>> saves
> >>>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>          the header
> >>>>>>>>>>>>>>>>>          > >>>>>> stuff.
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>> I think the abstraction of always
> >>>>> repartitioning
> >>>>>>>>>> back is
> >>>>>>>>>>>>>>>>>          just not so
> >>>>>>>>>>>>>>>>>          > >>>>>> strong. Like the work has been done
> >>> before
> >>>>> we
> >>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>          back and
> >>>>>>>>>>>>>>>>>          > >>>>>> grouping
> >>>>>>>>>>>>>>>>>          > >>>>>> by something else afterwards is really
> >>>>> common.
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>> On 05.09.2018 13:49, Adam Bellemare
> >>> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>> Hi Matthias
> >>>>>>>>>>>>>>>>>          > >>>>>>> Thank you for your feedback, I do
> >>>>> appreciate
> >>>>>>> it!
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> While name spacing would be possible,
> it
> >>>>> would
> >>>>>>>>>> require
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>          > deserialize
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> user headers what implies a runtime
> >>>>> overhead.
> >>>>>>> I
> >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>          suggest to
> >>>>>>>>>>>>>>>>>          > no
> >>>>>>>>>>>>>>>>>          > >>>>>>>> namespace for now to avoid the
> >>> overhead.
> >>>>> If
> >>>>>>> this
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > problem in
> >>>>>>>>>>>>>>>>>          > >>>>>>>> the future, we can still add name
> >>> spacing
> >>>>>>> later
> >>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Agreed. I will go with using a
> reserved
> >>>>> string
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>          document it.
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> My main concern about the design it
> the
> >>>>> type of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          result KTable:
> >>>>>>>>>>>>>>>>>          > If
> >>>>>>>>>>>>>>>>>          > >>>>>>> I
> >>>>>>>>>>>>>>>>>          > >>>>>>> understood the proposal correctly,
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> In your example, you have table1 and
> >>> table2
> >>>>>>>>>> swapped.
> >>>>>>>>>>>>>>>>>          Here is how it
> >>>>>>>>>>>>>>>>>          > >>>>>>> works
> >>>>>>>>>>>>>>>>>          > >>>>>>> currently:
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> 1) table1 has the records that contain
> >>> the
> >>>>>>>>>> foreign key
> >>>>>>>>>>>>>>>>>          within their
> >>>>>>>>>>>>>>>>>          > >>>>>>> value.
> >>>>>>>>>>>>>>>>>          > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>          > >>>>>>> <c,(fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>          > >>>>>>> table2 input stream: <A,X>, <B,Y>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> 2) A Value mapper is required to
> extract
> >>>>> the
> >>>>>>>>>> foreign
> >>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>          > >>>>>>> table1 foreign key mapper: ( value =>
> >>>>> value.fk
> >>>>>>>>>>>>>>>>>          <http://value.fk> )
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> The mapper is applied to each element
> in
> >>>>>>> table1,
> >>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>          new combined
> >>>>>>>>>>>>>>>>>          > >>>>>>> key is
> >>>>>>>>>>>>>>>>>          > >>>>>>> made:
> >>>>>>>>>>>>>>>>>          > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>,
> >>> <A-b,
> >>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>          <B-c,
> >>>>>>>>>>>>>>>>>          > >>>>>>> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> 3) The rekeyed events are
> copartitioned
> >>>>> with
> >>>>>>>>>> table2:
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> a) Stream Thread with Partition 0:
> >>>>>>>>>>>>>>>>>          > >>>>>>> RepartitionedTable1: <A-a,
> >>> (fk=A,bar=1)>,
> >>>>> <A-b,
> >>>>>>>>>>>>>>>>>          (fk=A,bar=2)>
> >>>>>>>>>>>>>>>>>          > >>>>>>> Table2: <A,X>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> b) Stream Thread with Partition 1:
> >>>>>>>>>>>>>>>>>          > >>>>>>> RepartitionedTable1: <B-c,
> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>          > >>>>>>> Table2: <B,Y>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> 4) From here, they can be joined
> >>> together
> >>>>>>> locally
> >>>>>>>>>> by
> >>>>>>>>>>>>>>>>>          applying the
> >>>>>>>>>>>>>>>>>          > >>>>>>> joiner
> >>>>>>>>>>>>>>>>>          > >>>>>>> function.
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> At this point, Jan's design and my
> >>> design
> >>>>>>>>>> deviate. My
> >>>>>>>>>>>>>>>>>          design goes
> >>>>>>>>>>>>>>>>>          > on
> >>>>>>>>>>>>>>>>>          > >>>>>>> to
> >>>>>>>>>>>>>>>>>          > >>>>>>> repartition the data post-join and
> >>> resolve
> >>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>          arrival of
> >>>>>>>>>>>>>>>>>          > >>>>>>> records,
> >>>>>>>>>>>>>>>>>          > >>>>>>> finally returning the data keyed just
> >>> the
> >>>>>>>>>> original key.
> >>>>>>>>>>>>>>>>>          I do not
> >>>>>>>>>>>>>>>>>          > >>>>>>> expose
> >>>>>>>>>>>>>>>>>          > >>>>>>> the
> >>>>>>>>>>>>>>>>>          > >>>>>>> CombinedKey or any of the internals
> >>>>> outside of
> >>>>>>> the
> >>>>>>>>>>>>>>>>>          joinOnForeignKey
> >>>>>>>>>>>>>>>>>          > >>>>>>> function. This does make for larger
> >>>>> footprint,
> >>>>>>>>>> but it
> >>>>>>>>>>>>>>>>>          removes all
> >>>>>>>>>>>>>>>>>          > >>>>>>> agency
> >>>>>>>>>>>>>>>>>          > >>>>>>> for resolving out-of-order arrivals
> and
> >>>>>>> handling
> >>>>>>>>>>>>>>>>>          CombinedKeys from
> >>>>>>>>>>>>>>>>>          > the
> >>>>>>>>>>>>>>>>>          > >>>>>>> user. I believe that this makes the
> >>>>> function
> >>>>>>> much
> >>>>>>>>>>>>>>>>> easier
> >>>>>>>>>>>>>>>>>          to use.
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> Let me know if this helps resolve your
> >>>>>>> questions,
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>          please feel
> >>>>>>>>>>>>>>>>>          > >>>>>>> free to
> >>>>>>>>>>>>>>>>>          > >>>>>>> add anything else on your mind.
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> Thanks again,
> >>>>>>>>>>>>>>>>>          > >>>>>>> Adam
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM,
> >>> Matthias J.
> >>>>>>> Sax <
> >>>>>>>>>>>>>>>>>          > >>>>>>> matth...@confluent.io <mailto:
> >>>>>>>>>> matth...@confluent.io>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>> Hi,
> >>>>>>>>>>>>>>>>>          > >>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> I am just catching up on this
> thread. I
> >>>>> did
> >>>>>>> not
> >>>>>>>>>> read
> >>>>>>>>>>>>>>>>>          everything so
> >>>>>>>>>>>>>>>>>          > >>>>>>>> far,
> >>>>>>>>>>>>>>>>>          > >>>>>>>> but want to share couple of initial
> >>>>> thoughts:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Headers: I think there is a
> fundamental
> >>>>>>>>>> difference
> >>>>>>>>>>>>>>>>>          between header
> >>>>>>>>>>>>>>>>>          > >>>>>>>> usage
> >>>>>>>>>>>>>>>>>          > >>>>>>>> in this KIP and KP-258. For 258, we
> add
> >>>>>>> headers
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>          changelog topic
> >>>>>>>>>>>>>>>>>          > >>>>>>>> that
> >>>>>>>>>>>>>>>>>          > >>>>>>>> are owned by Kafka Streams and nobody
> >>>>> else is
> >>>>>>>>>> supposed
> >>>>>>>>>>>>>>>>>          to write
> >>>>>>>>>>>>>>>>>          > into
> >>>>>>>>>>>>>>>>>          > >>>>>>>> them. In fact, no user header are
> >>> written
> >>>>> into
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          changelog topic
> >>>>>>>>>>>>>>>>>          > >>>>>>>> and
> >>>>>>>>>>>>>>>>>          > >>>>>>>> thus, there are not conflicts.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Nevertheless, I don't see a big issue
> >>> with
> >>>>>>> using
> >>>>>>>>>>>>>>>>>          headers within
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Streams.
> >>>>>>>>>>>>>>>>>          > >>>>>>>> As long as we document it, we can
> have
> >>>>> some
> >>>>>>>>>> "reserved"
> >>>>>>>>>>>>>>>>>          header keys
> >>>>>>>>>>>>>>>>>          > >>>>>>>> and
> >>>>>>>>>>>>>>>>>          > >>>>>>>> users are not allowed to use when
> >>>>> processing
> >>>>>>>>>> data with
> >>>>>>>>>>>>>>>>>          Kafka
> >>>>>>>>>>>>>>>>>          > Streams.
> >>>>>>>>>>>>>>>>>          > >>>>>>>> IMHO, this should be ok.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> I think there is a safe way to avoid
> >>>>>>> conflicts,
> >>>>>>>>>> since
> >>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>          > headers
> >>>>>>>>>>>>>>>>>          > >>>>>>>> are
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> only needed in internal topics (I
> >>> think):
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> For internal and changelog topics,
> we
> >>> can
> >>>>>>>>>> namespace
> >>>>>>>>>>>>>>>>>          all headers:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> * user-defined headers are
> namespaced
> >>> as
> >>>>>>>>>> "external."
> >>>>>>>>>>>>>>>>> +
> >>>>>>>>>>>>>>>>>          headerKey
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> * internal headers are namespaced as
> >>>>>>>>>> "internal." +
> >>>>>>>>>>>>>>>>>          headerKey
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> While name spacing would be
> possible,
> >>> it
> >>>>>>> would
> >>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > >>>>>>>> deserialize
> >>>>>>>>>>>>>>>>>          > >>>>>>>> user headers what implies a runtime
> >>>>> overhead.
> >>>>>>> I
> >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>          suggest to
> >>>>>>>>>>>>>>>>>          > no
> >>>>>>>>>>>>>>>>>          > >>>>>>>> namespace for now to avoid the
> >>> overhead.
> >>>>> If
> >>>>>>> this
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > problem in
> >>>>>>>>>>>>>>>>>          > >>>>>>>> the future, we can still add name
> >>> spacing
> >>>>>>> later
> >>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> My main concern about the design it
> the
> >>>>> type
> >>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          result KTable:
> >>>>>>>>>>>>>>>>>          > >>>>>>>> If I
> >>>>>>>>>>>>>>>>>          > >>>>>>>> understood the proposal correctly,
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K1,V1> table1 = ...
> >>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K2,V2> table2 = ...
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> KTable<K1,V3> joinedTable =
> >>>>>>>>>> table1.join(table2,...);
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> implies that the `joinedTable` has
> the
> >>>>> same
> >>>>>>> key
> >>>>>>>>>> as the
> >>>>>>>>>>>>>>>>>          left input
> >>>>>>>>>>>>>>>>>          > >>>>>>>> table.
> >>>>>>>>>>>>>>>>>          > >>>>>>>> IMHO, this does not work because if
> >>> table2
> >>>>>>>>>> contains
> >>>>>>>>>>>>>>>>>          multiple rows
> >>>>>>>>>>>>>>>>>          > >>>>>>>> that
> >>>>>>>>>>>>>>>>>          > >>>>>>>> join with a record in table1 (what is
> >>> the
> >>>>> main
> >>>>>>>>>> purpose
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>          > foreign
> >>>>>>>>>>>>>>>>>          > >>>>>>>> key
> >>>>>>>>>>>>>>>>>          > >>>>>>>> join), the result table would only
> >>>>> contain a
> >>>>>>>>>> single
> >>>>>>>>>>>>>>>>>          join result,
> >>>>>>>>>>>>>>>>>          > but
> >>>>>>>>>>>>>>>>>          > >>>>>>>> not
> >>>>>>>>>>>>>>>>>          > >>>>>>>> multiple.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Example:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> table1 input stream: <A,X>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> table2 input stream: <a,(A,1)>,
> >>> <b,(A,2)>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> We use table2 value a foreign key to
> >>>>> table1
> >>>>>>> key
> >>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>>          "A" joins).
> >>>>>>>>>>>>>>>>>          > If
> >>>>>>>>>>>>>>>>>          > >>>>>>>> the
> >>>>>>>>>>>>>>>>>          > >>>>>>>> result key is the same key as key of
> >>>>> table1,
> >>>>>>> this
> >>>>>>>>>>>>>>>>>          implies that the
> >>>>>>>>>>>>>>>>>          > >>>>>>>> result can either be <A, join(X,1)>
> or
> >>> <A,
> >>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>          but not
> >>>>>>>>>>>>>>>>>          > both.
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Because the share the same key,
> >>> whatever
> >>>>>>> result
> >>>>>>>>>> record
> >>>>>>>>>>>>>>>>>          we emit
> >>>>>>>>>>>>>>>>>          > later,
> >>>>>>>>>>>>>>>>>          > >>>>>>>> overwrite the previous result.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> This is the reason why Jan originally
> >>>>> proposed
> >>>>>>>>>> to use
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>          > combination
> >>>>>>>>>>>>>>>>>          > >>>>>>>> of
> >>>>>>>>>>>>>>>>>          > >>>>>>>> both primary keys of the input tables
> >>> as
> >>>>> key
> >>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          output table.
> >>>>>>>>>>>>>>>>>          > >>>>>>>> This
> >>>>>>>>>>>>>>>>>          > >>>>>>>> makes the keys of the output table
> >>> unique
> >>>>> and
> >>>>>>> we
> >>>>>>>>>> can
> >>>>>>>>>>>>>>>>>          store both in
> >>>>>>>>>>>>>>>>>          > >>>>>>>> the
> >>>>>>>>>>>>>>>>>          > >>>>>>>> output table:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Result would be <A-a, join(X,1)>,
> <A-b,
> >>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak
> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>> Just on remark here.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> The high-watermark could be
> >>> disregarded.
> >>>>> The
> >>>>>>>>>> decision
> >>>>>>>>>>>>>>>>>          about the
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> forward
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> depends on the size of the
> aggregated
> >>>>> map.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> Only 1 element long maps would be
> >>>>> unpacked
> >>>>>>> and
> >>>>>>>>>>>>>>>>>          forwarded. 0
> >>>>>>>>>>>>>>>>>          > element
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> maps
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> would be published as delete. Any
> >>> other
> >>>>> count
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> of map entries is in "waiting for
> >>> correct
> >>>>>>>>>> deletes to
> >>>>>>>>>>>>>>>>>          > arrive"-state.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare
> >>>>> wrote:
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> It does look like I could replace
> the
> >>>>> second
> >>>>>>>>>>>>>>>>>          repartition store
> >>>>>>>>>>>>>>>>>          > and
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> highwater store with a groupBy and
> >>>>> reduce.
> >>>>>>>>>> However,
> >>>>>>>>>>>>>>>>>          it looks
> >>>>>>>>>>>>>>>>>          > like
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> I
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> still need to store the highwater
> >>> value
> >>>>>>> within
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          materialized
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>>
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> compare the arrival of out-of-order
> >>>>> records
> >>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>          > >>>>>>>>> understanding
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> of
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> THIS is correct...). This in effect
> is
> >>>>> the
> >>>>>>> same
> >>>>>>>>>> as
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>          design I
> >>>>>>>>>>>>>>>>>          > have
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> now,
> >>>>>>>>>>>>>>>>>          > >>>>>>>>> just with the two tables merged
> >>> together.
> >>>>>>>>>>>>>>>>>          > >>>>>>>>>
> >>>>>>>>>>>>>>>>>          >
> >>>>>>>>>>>>>>>>>          >
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>


-- 
-- Guozhang

Reply via email to