Hey all,

We have spent some time with Asterios, Paris and Jonas to finalize the
windowing semantics (both the current features and the window join), and I
think we made very have come up with a very clear picture.

We will write down the proposed semantics and publish it to the wiki next
week.

Cheers,
Gyula

On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos <
asterios.katsifodi...@tu-berlin.de> wrote:

> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
> Streams does: symmetric hash join.
>
> From [1]:
> "When a tuple is received on an input port, it is inserted into the window
> corresponding to the input port, which causes the window to trigger. As
> part of the trigger processing, the tuple is compared against all tuples
> inside the window of the opposing input port. If the tuples match, then an
> output tuple will be produced for each match. If at least one output was
> generated, a window punctuation will be generated after all the outputs."
>
> Cheers,
> Asterios
>
> [1]
>
> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html
>
>
>
> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
>
> > Hi Paris,
> >
> > thanks for the pointer to the Naiad paper. That is quite interesting.
> >
> > The paper I mentioned [1], does not describe the semantics in detail; it
> > is more about the implementation for the stream-joins. However, it uses
> > the same semantics (from my understanding) as proposed by Gyula.
> >
> > -Matthias
> >
> > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
> > Streams". VLDB 2002.
> >
> >
> >
> > On 04/07/2015 12:38 PM, Paris Carbone wrote:
> > > Hello Matthias,
> > >
> > > Sure, ordering guarantees are indeed a tricky thing, I recall having
> > that discussion back in TU Berlin. Bear in mind thought that DataStream,
> > our abstract data type, represents a *partitioned* unbounded sequence of
> > events. There are no *global* ordering guarantees made whatsoever in that
> > model across partitions. If you see it more generally there are many
> “race
> > conditions” in a distributed execution graph of vertices that process
> > multiple inputs asynchronously, especially when you add joins and
> > iterations into the mix (how do you deal with reprocessing “old” tuples
> > that iterate in the graph). Btw have you checked the Naiad paper [1]?
> > Stephan cited a while ago and it is quite relevant to that discussion.
> > >
> > > Also, can you cite the paper with the joining semantics you are
> > referring to? That would be of good help I think.
> > >
> > > Paris
> > >
> > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> > >
> > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> > >
> > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> > > On 07 Apr 2015, at 11:50, Matthias J. Sax <
> mj...@informatik.hu-berlin.de
> > <mailto:mj...@informatik.hu-berlin.de>> wrote:
> > >
> > > Hi @all,
> > >
> > > please keep me in the loop for this work. I am highly interested and I
> > > want to help on it.
> > >
> > > My initial thoughts are as follows:
> > >
> > > 1) Currently, system timestamps are used and the suggested approach can
> > > be seen as state-of-the-art (there is actually a research paper using
> > > the exact same join semantic). Of course, the current approach is
> > > inherently non-deterministic. The advantage is, that there is no
> > > overhead in keeping track of the order of records and the latency
> should
> > > be very low. (Additionally, state-recovery is simplified. Because, the
> > > processing in inherently non-deterministic, recovery can be done with
> > > relaxed guarantees).
> > >
> > >  2) The user should be able to "switch on" deterministic processing,
> > > ie, records are timestamped (either externally when generated, or
> > > timestamped at the sources). Because deterministic processing adds some
> > > overhead, the user should decide for it actively.
> > > In this case, the order must be preserved in each re-distribution step
> > > (merging is sufficient, if order is preserved within each incoming
> > > channel). Furthermore, deterministic processing can be achieved by
> sound
> > > window semantics (and there is a bunch of them). Even for
> > > single-stream-windows it's a tricky problem; for join-windows it's even
> > > harder. From my point of view, it is less important which semantics are
> > > chosen; however, the user must be aware how it works. The most tricky
> > > part for deterministic processing, is to deal with duplicate timestamps
> > > (which cannot be avoided). The timestamping for (intermediate) result
> > > tuples, is also an important question to be answered.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 04/07/2015 11:37 AM, Gyula Fóra wrote:
> > > Hey,
> > >
> > > I agree with Kostas, if we define the exact semantics how this works,
> > this
> > > is not more ad-hoc than any other stateful operator with multiple
> inputs.
> > > (And I don't think any other system support something similar)
> > >
> > > We need to make some design choices that are similar to the issues we
> had
> > > for windowing. We need to chose how we want to evaluate the windowing
> > > policies (global or local) because that affects what kind of policies
> can
> > > be parallel, but I can work on these things.
> > >
> > > I think this is an amazing feature, so I wouldn't necessarily rush the
> > > implementation for 0.9 though.
> > >
> > > And thanks for helping writing these down.
> > >
> > > Gyula
> > >
> > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzou...@apache.org
> > <mailto:ktzou...@apache.org>> wrote:
> > >
> > > Yes, we should write these semantics down. I volunteer to help.
> > >
> > > I don't think that this is very ad-hoc. The semantics are basically the
> > > following. Assuming an arriving element from the left side:
> > > (1) We find the right-side matches
> > > (2) We insert the left-side arrival into the left window
> > > (3) We recompute the left window
> > > We need to see whether right window re-computation needs to be
> triggered
> > as
> > > well. I think that this way of joining streams is also what the
> symmetric
> > > hash join algorithms were meant to support.
> > >
> > > Kostas
> > >
> > >
> > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org
> <mailto:
> > se...@apache.org>> wrote:
> > >
> > > Is the approach of joining an element at a time from one input against
> a
> > > window on the other input not a bit arbitrary?
> > >
> > > This just joins whatever currently happens to be the window by the time
> > > the
> > > single element arrives - that is a bit non-predictable, right?
> > >
> > > As a more general point: The whole semantics of windowing and when they
> > > are
> > > triggered are a bit ad-hoc now. It would be really good to start
> > > formalizing that a bit and
> > > put it down somewhere. Users need to be able to clearly understand and
> > > how
> > > to predict the output.
> > >
> > >
> > >
> > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com
> > <mailto:gyula.f...@gmail.com>>
> > > wrote:
> > >
> > > I think it should be possible to make this compatible with the
> > > .window().every() calls. Maybe if there is some trigger set in "every"
> > > we
> > > would not join that stream 1 by 1 but every so many elements. The
> > > problem
> > > here is that the window and every in this case are very-very different
> > > than
> > > the normal windowing semantics. The window would define the join window
> > > for
> > > each element of the other stream while every would define how often I
> > > join
> > > This stream with the other one.
> > >
> > > We need to think to make this intuitive.
> > >
> > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>>
> > > wrote:
> > >
> > > That would be really neat, the problem I see there, that we do not
> > > distinguish between dataStream.window() and
> > > dataStream.window().every()
> > > currently, they both return WindowedDataStreams and TriggerPolicies
> > > of
> > > the
> > > every call do not make much sense in this setting (in fact
> > > practically
> > > the
> > > trigger is always set to count of one).
> > >
> > > But of course we could make it in a way, that we check that the
> > > eviction
> > > should be either null or count of 1, in every other case we throw an
> > > exception while building the JobGraph.
> > >
> > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> > > aljos...@apache.org<mailto:aljos...@apache.org>>
> > > wrote:
> > >
> > > Or you could define it like this:
> > >
> > > stream_A = a.window(...)
> > > stream_B = b.window(...)
> > >
> > > stream_A.join(stream_B).where().equals().with()
> > >
> > > So a join would just be a join of two WindowedDataStreamS. This
> > > would
> > > neatly move the windowing stuff into one place.
> > >
> > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
> > >
> > > wrote:
> > > Big +1 for the proposal for Peter and Gyula. I'm really for
> > > bringing
> > > the
> > > windowing and window join API in sync.
> > >
> > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org<mailto:
> > gyf...@apache.org>>
> > > wrote:
> > >
> > > Hey guys,
> > >
> > > As Aljoscha has highlighted earlier the current window join
> > > semantics
> > > in
> > > the streaming api doesn't follow the changes in the windowing
> > > api.
> > > More
> > > precisely, we currently only support joins over time windows of
> > > equal
> > > size
> > > on both streams. The reason for this is that we now take a
> > > window
> > > of
> > > each
> > > of the two streams and do joins over these pairs. This would be
> > > a
> > > blocking
> > > operation if the windows are not closed at exactly the same time
> > > (and
> > > since
> > > we dont want this we only allow time windows)
> > >
> > > I talked with Peter who came up with the initial idea of an
> > > alternative
> > > approach for stream joins which works as follows:
> > >
> > > Instead of pairing windows for joins, we do element against
> > > window
> > > joins.
> > > What this means is that whenever we receive an element from one
> > > of
> > > the
> > > streams, we join this element with the current window(this
> > > window
> > > is
> > > constantly updated) of the other stream. This is non-blocking on
> > > any
> > > window
> > > definitions as we dont have to wait for windows to be completed
> > > and
> > > we
> > > can
> > > use this with any of our predefined policies like Time.of(...),
> > > Count.of(...), Delta.of(....).
> > >
> > > Additionally this also allows some very flexible way of defining
> > > window
> > > joins. With this we could also define grouped windowing inside
> > > if
> > > a
> > > join.
> > > An example of this would be: Join all elements of Stream1 with
> > > the
> > > last
> > > 5
> > > elements by a given windowkey of Stream2 on some join key.
> > >
> > > This feature can be easily implemented over the current
> > > operators,
> > > so
> > > I
> > > already have a working prototype for the simple non-grouped
> > > case.
> > > My
> > > only
> > > concern is the API, the best thing I could come up with is
> > > something
> > > like
> > > this:
> > >
> > > stream_A.join(stream_B).onWindow(windowDefA,
> > > windowDefB).by(windowKey1,
> > > windowKey2).where(...).equalTo(...).with(...)
> > >
> > > (the user can omit the "by" and "with" calls)
> > >
> > > I think this new approach would be worthy of our "flexible
> > > windowing"
> > > in
> > > contrast with the current approach.
> > >
> > > Regards,
> > > Gyula
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
>

Reply via email to