I think I'll have to elaborate a bit so I created a proof-of-concept
implementation of my Ideas and ran some throughput measurements to
alleviate concerns about performance.

First, though, I want to highlight again why the current approach does not
work with out-of-order elements (which, again, occur constantly due to the
distributed nature of the system). This is the example I posted earlier:
https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like
this:

+--+
| | Source
+--+
|
+-----+
| |
| +--+
| | | Identity Map
| +--+
| |
+-----+
|
+--+
| | Window
+--+
|
|
+--+
| | Sink
+--+

So all it does is pass the elements through an identity map and then merge
them again before the window operator. The source emits ascending integers
and the window operator has a custom timestamp extractor that uses the
integer itself as the timestamp and should create windows of size 4 (that
is elements with timestamp 0-3 are one window, the next are the elements
with timestamp 4-8, and so on). Since the topology basically doubles the
elements form the source I would expect to get these windows:
Window: 0, 0, 1, 1, 2, 2, 3, 3
Window: 4, 4, 6, 6, 7, 7, 8, 8

The output is this, however:
Window: 0, 1, 2, 3,
Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
Window: 8, 9, 10, 11,
Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
Window: 16, 17, 18, 19,
Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
Window: 24, 25, 26, 27,
Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,

The reason is that the elements simply arrive out-of-order. Imagine what
would happen if the elements actually arrived with some delay from
different operations.

Now, on to the performance numbers. The proof-of-concept I created is
available here:
https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic
idea is that sources assign the current timestamp when emitting elements.
They also periodically emit watermarks that tell us that no elements with
an earlier timestamp will be emitted. The watermarks propagate through the
operators. The window operator looks at the timestamp of an element and
puts it into the buffer that corresponds to that window. When the window
operator receives a watermark it will look at the in-flight windows
(basically the buffers) and emit those windows where the window-end is
before the watermark.

For measuring throughput I did the following: The source emits tuples of
the form ("tuple", 1) in an infinite loop. The window operator sums up the
tuples, thereby counting how many tuples the window operator can handle in
a given time window. There are two different implementations for the
summation: 1) simply summing up the values in a mapWindow(), there you get
a List of all tuples and simple iterate over it. 2) using sum(1), which is
implemented as a reduce() (that uses the pre-reducer optimisations).

These are the performance numbers (Current is the current implementation,
Next is my proof-of-concept):

Tumbling (1 sec):
 - Current/Map: 1.6 mio
 - Current/Reduce: 2 mio
 - Next/Map: 2.2 mio
 - Next/Reduce: 4 mio

Sliding (5 sec, slide 1 sec):
 - Current/Map: ca 3 mio (fluctuates a lot)
 - Current/Reduce: No output
 - Next/Map: ca 4 mio (fluctuates)
 - Next/Reduce: 10 mio

The Next/Reduce variant can basically scale indefinitely with window size
because the internal state does not rely on the number of elements (it is
just the current sum). The pre-reducer for sliding elements cannot handle
the amount of tuples, it produces no output. For the two Map variants the
performance fluctuates because they always keep all the elements in an
internal buffer before emission, this seems to tax the garbage collector a
bit and leads to random pauses.

One thing that should be noted is that I had to disable the fake-element
emission thread, otherwise the Current versions would deadlock.

So, I started working on this because I thought that out-of-order
processing would be necessary for correctness. And it is certainly, But the
proof-of-concept also shows that performance can be greatly improved.

On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> I agree lets separate these topics from each other so we can get faster
> resolution.
>
> There is already a state discussion in the thread we started with Paris.
>
> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org>
wrote:
>
> > I agree with supporting out-of-order out of the box :-), even if this
means
> > a major refactoring. This is the right time to refactor the streaming
API
> > before we pull it out of beta. I think that this is more important than
new
> > features in the streaming API, which can be prioritized once the API is
out
> > of beta (meaning, that IMO this is the right time to stall PRs until we
> > agree on the design).
> >
> > There are three sections in the document: windowing, state, and API. How
> > convoluted are those with each other? Can we separate the discussion or
do
> > we need to discuss those all together? I think part of the difficulty is
> > that we are discussing three design choices at once.
> >
> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com>
> > wrote:
> >
> > > Out of order is ubiquitous in the real-world.  Typically, what
happens is
> > > that businesses will declare a maximum allowable delay for delayed
> > > transactions and will commit to results when that delay is reached.
> > > Transactions that arrive later than this cutoff are collected
specially
> > as
> > > corrections which are reported/used when possible.
> > >
> > > Clearly, ordering can also be violated during processing, but if the
data
> > > is originally out of order the situation can't be repaired by any
> > protocol
> > > fixes that prevent transactions from becoming disordered but has to
> > handled
> > > at the data level.
> > >
> > >
> > >
> > >
> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <aljos...@apache.org
>
> > > wrote:
> > >
> > > > I also don't like big changes but sometimes they are necessary. The
> > > reason
> > > > why I'm so adamant about out-of-order processing is that
out-of-order
> > > > elements are not some exception that occurs once in a while; they
occur
> > > > constantly in a distributed system. For example, in this:
> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> > > > windows
> > > > are completely bogus because the current windowing system assumes
> > > elements
> > > > to globally arrive in order, which is simply not true. (The example
> > has a
> > > > source that generates increasing integers. Then these pass through a
> > map
> > > > and are unioned with the original DataStream before a window
operator.)
> > > > This simulates elements arriving from different operators at a
> > windowing
> > > > operator. The example is also DOP=1, I imagine this to get worse
with
> > > > higher DOP.
> > > >
> > > > What do you mean by costly? As I said, I have a proof-of-concept
> > > windowing
> > > > operator that can handle out-or-order elements. This is an example
> > using
> > > > the current Flink API:
> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > > > (It is an infinite source of tuples and a 5 second window operator
that
> > > > counts the tuples.) The first problem is that this code deadlocks
> > because
> > > > of the thread that emits fake elements. If I disable the fake
element
> > > code
> > > > it works, but the throughput using my mockup is 4 times higher . The
> > gap
> > > > widens dramatically if the window size increases.
> > > >
> > > > So, it actually increases performance (unless I'm making a mistake
in
> > my
> > > > explorations) and can handle elements that arrive out-of-order
(which
> > > > happens basically always in a real-world windowing use-cases).
> > > >
> > > >
> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org> wrote:
> > > >
> > > > > What I like a lot about Aljoscha's proposed design is that we
need no
> > > > > different code for "system time" vs. "event time". It only
differs in
> > > > where
> > > > > the timestamps are assigned.
> > > > >
> > > > > The OOP approach also gives you the semantics of total ordering
> > without
> > > > > imposing merges on the streams.
> > > > >
> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > > > > mj...@informatik.hu-berlin.de> wrote:
> > > > >
> > > > > > I agree that there should be multiple alternatives the user(!)
can
> > > > > > choose from. Partial out-of-order processing works for many/most
> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
global
> > > > > > ordering in necessary (even if the performance penalty might be
> > > high).
> > > > > >
> > > > > > I would also keep "system-time windows" as an alternative to
> > "source
> > > > > > assigned ts-windows".
> > > > > >
> > > > > > It might also be interesting to consider the following paper for
> > > > > > overlapping windows: "Resource sharing in continuous
sliding-window
> > > > > > aggregates"
> > > > > >
> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > > > > > Hey
> > > > > > >
> > > > > > > I think we should not block PRs unnecessarily if your
suggested
> > > > changes
> > > > > > > might touch them at some point.
> > > > > > >
> > > > > > > Also I still think we should not put everything in the
Datastream
> > > > > because
> > > > > > > it will be a huge mess.
> > > > > > >
> > > > > > > Also we need to agree on the out of order processing, whether
we
> > > want
> > > > > it
> > > > > > > the way you proposed it(which is quite costly). Another
> > alternative
> > > > > > > approach there which fits in the current windowing is to
filter
> > out
> > > > if
> > > > > > > order events and apply a special handling operator on them.
This
> > > > would
> > > > > be
> > > > > > > fairly lightweight.
> > > > > > >
> > > > > > > My point is that we need to consider some alternative
solutions.
> > > And
> > > > we
> > > > > > > should not block contributions along the way.
> > > > > > >
> > > > > > > Cheers
> > > > > > > Gyula
> > > > > > >
> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> > > > aljos...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> The reason I posted this now is that we need to think about
the
> > > API
> > > > > and
> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
> > reduce)
> > > > and
> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> > > > > > >>
> > > > > > >> For the windowing, I think that the current model does not
work
> > > for
> > > > > > >> out-of-order processing. Therefore, the whole windowing
> > > > infrastructure
> > > > > > will
> > > > > > >> basically have to be redone. Meaning also that any work on
the
> > > > > > >> pre-aggregators or optimizations that we do now becomes
useless.
> > > > > > >>
> > > > > > >> For the API, I proposed to restructure the interactions
between
> > > all
> > > > > the
> > > > > > >> different *DataStream classes and grouping/windowing. (See
API
> > > > section
> > > > > > of
> > > > > > >> the doc I posted.)
> > > > > > >>
> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <gyula.f...@gmail.com
>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hi Aljoscha,
> > > > > > >>>
> > > > > > >>> Thanks for the nice summary, this is a very good initiative.
> > > > > > >>>
> > > > > > >>> I added some comments to the respective sections (where I
didnt
> > > > fully
> > > > > > >> agree
> > > > > > >>> :).).
> > > > > > >>> At some point I think it would be good to have a public
hangout
> > > > > session
> > > > > > >> on
> > > > > > >>> this, which could make a more dynamic discussion.
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Gyula
> > > > > > >>>
> > > > > > >>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> > 2015.
> > > > jún.
> > > > > > >> 22.,
> > > > > > >>> H, 21:34):
> > > > > > >>>
> > > > > > >>>> Hi,
> > > > > > >>>> with people proposing changes to the streaming part I also
> > > wanted
> > > > to
> > > > > > >>> throw
> > > > > > >>>> my hat into the ring. :D
> > > > > > >>>>
> > > > > > >>>> During the last few months, while I was getting acquainted
> > with
> > > > the
> > > > > > >>>> streaming system, I wrote down some thoughts I had about
how
> > > > things
> > > > > > >> could
> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape
> > now,
> > > > so
> > > > > > >>> please
> > > > > > >>>> have a look if you are interested in this:
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > > > > > >>>>
> > > > > > >>>> This mostly covers:
> > > > > > >>>>  - Timestamps assigned at sources
> > > > > > >>>>  - Out-of-order processing of elements in window operators
> > > > > > >>>>  - API design
> > > > > > >>>>
> > > > > > >>>> Please let me know what you think. Comment in the document
or
> > > here
> > > > > in
> > > > > > >> the
> > > > > > >>>> mailing list.
> > > > > > >>>>
> > > > > > >>>> I have a PR in the makings that would introduce source
> > > timestamps
> > > > > and
> > > > > > >>>> watermarks for keeping track of them. I also hacked a
> > > > > proof-of-concept
> > > > > > >>> of a
> > > > > > >>>> windowing system that is able to process out-of-order
elements
> > > > > using a
> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> > > > > > >> pre-aggregations.)
> > > > > > >>>>
> > > > > > >>>> Cheers,
> > > > > > >>>> Aljoscha
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to