Re: Full stream-stream join semantics

2019-11-24 Thread Jan Lukavský
I can put down a design document, but before that I need to clarify some 
things for me. I'm struggling to put all of this into a bigger picture. 
Sorry if the arguments are circulating, but I didn't notice any proposal 
of how to solve these. If anyone can disprove any of this logic it would 
be very much appreciated as I might be able to get from a dead end:


 a) in the bi-temporal join you can either buffer until watermark, or 
emit false data that has to be retracted


 b) until retractions are 100% functional (and that is sort of holy 
grail for now), then the only solution is using a buffer holding data up 
to watermark *and then sort by event time*


 c) even if retractions were 100% functional, there would have to be 
special implementation for batch case, because otherwise this would 
simply blow up downstream processing with insanely many false additions 
and subsequent retractions


Property b) means that if we want this feature now, we must sort by 
event time and there is no way around. Property c) shows that even in 
the future, we must make (in certain cases) distinction between batch 
and streaming code paths, which seems weird to me, but it might be an 
option. But still, there is no way to express this join in batch case, 
because it would require either buffering (up to) whole input on local 
worker (doesn't look like viable option) or provide a way in user code 
to signal the need for ordering of data inside GBK (and we are there 
again :)). Yes, we might shift this need from stateful dofn to GBK like


 input.apply(GroupByKey.sorted())

I cannot find a good reasoning why this would be better than giving this 
semantics to (stateful) ParDo.


Maybe someone can help me out here?

Jan

On 11/24/19 5:05 AM, Kenneth Knowles wrote:
I don't actually see how event time sorting simplifies this case much. 
You still need to buffer elements until they can no longer be matched 
in the join, and you still need to query that buffer for elements that 
might match. The general "bi-temporal join" (without sorting) requires 
one new state type and then it has identical API, does not require any 
novel data structures or reasoning, yields better latency (no sort 
buffer delay), and discards less data (no sort buffer cutoff; 
watermark is better). Perhaps a design document about this specific 
case would clarify.


Kenn

On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský > wrote:


I didn't want to go too much into detail, but to describe the idea
roughly (ignoring the problem of different window fns on both
sides to keep it as simple as possible):

rhs -  \

    flatten (on global window)  stateful par do
(sorted by event time)   output

lhs -  /

If we can guarantee event time order arrival of events into the
stateful pardo, then the whole complexity reduces to keep current
value of left and right element and just flush them out each time
there is an update. That is the "knob" is actually when watermark
moves, because it is what tells the join operation that there will
be no more (not late) input. This is very, very simplified, but
depicts the solution. The "classical" windowed join reduces to
this if all data in each window is projected onto window end
boundary. Then there will be a cartesian product, because all the
elements have the same timestamp. I can put this into a design doc
with all the details, I was trying to find out if there is or was
any effort around this.

I was in touch with Reza in the PR #9032, I think that it
currently suffers from problems with running this on batch.

I think I can even (partly) resolve the retraction issue (for
joins), as described on the thread [1]. Shortly, there can be two
copies of the stateful dofn, one running at watermark and the
other at (watermark - allowed lateness). One would produce ON_TIME
(maybe wrong) results, the other would produce LATE but correct
ones. Being able to compare them, the outcome would be that it
would be possible to retract the wrong results.

Yes, this is also about providing more evidence of why I think
event-time sorting should be (somehow) part of the model. :-)

Jan

[1]

https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

On 11/23/19 5:54 AM, Kenneth Knowles wrote:

+Mikhail Gryzykhin  +Rui Wang
 +Reza Rokni
 who have all done some investigations here.


On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:


On 11/22/19 7:54 PM, Reuven Lax wrote:



On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Hi Reuven,

I didn't investigate that particular one, but looking
into that now

Re: [VOTE] Beam Mascot animal choice: vote for as many as you want

2019-11-24 Thread Matthias Baetens
In case I'm not too late:

[ ] Beaver
[ ] Hedgehog
[ ] Lemur
[ ] Owl
[ ] Salmon
[ ] Trout
[ ] Robot dinosaur
[X ] Firefly
[ ] Cuttlefish
[X ] Dumbo Octopus
[ ] Angler fish

I like angler fish a lot, but I think no one will join any meetups since
they're scary as hell haha


On Sun, Nov 24, 2019, 04:27 Kenneth Knowles  wrote:

> David - if you can reconfigure the form so it is not anonymous (at least
> to me) then I may be up for including those results in the tally. I don't
> want to penalize those who voted via the form. But since there are now two
> voting channels we have to dedupe or discard the form results. And I need
> to be able to see which votes are PMC. Even if advisory, it does need to
> move to a concluding vote, and PMC votes could be a tiebreaker of sorts.
>
> Kenn
>
> On Sat, Nov 23, 2019 at 7:17 PM Kenneth Knowles  wrote:
>
>> On Fri, Nov 22, 2019 at 10:24 AM Robert Bradshaw 
>> wrote:
>>
>>> On Thu, Nov 21, 2019 at 7:05 PM David Cavazos 
>>> wrote:
>>>


 I created this Google Form
 
 if everyone is okay with it to make it easier to both vote and view the
 results :)

>>>
>>> Generally decisions, especially votes, for apache projects are supposed
>>> to happen on-list. I suppose this is more an advisory vote, but still
>>> probably makes sense to keep it here. .
>>>
>>
>> Indeed. Someone suggested a Google form before I started this, but I
>> deliberately didn't use it. It doesn't add much and it puts the vote off
>> list onto opaque and mutable third party infrastructure.
>>
>> If you voted on the form, please repeat it on thread so I can count it.
>>
>> Kenn
>>
>>
>>
>> import collections, pprint, re, requests
>>> thread = requests.get('
>>> https://lists.apache.org/api/thread.lua?id=ff60eabbf8349ba6951633869000356c2c2feb48bbff187cf3c60039@%3Cdev.beam.apache.org%3E').json(
>>> )
>>> counts = collections.defaultdict(int)
>>> for email in thread['emails']:
>>>   body = requests.get('https://lists.apache.org/api/email.lua?id=%s' %
>>> email['mid']).json()['body']
>>>   for vote in re.findall(r'\n\s*\[\s*[xX]\s*\]\s*([a-zA-Z ]+)', body):
>>> counts[vote] += 1
>>>   pprint.pprint(sorted(counts.items(), key=lambda kv: kv[-1]))
>>>
>>> ...
>>>
>>> [('Beaver', 1),
>>>
>>>  ('Capybara', 2),
>>>
>>>  ('Trout', 2),
>>>
>>>  ('Salmon', 4),
>>>
>>>  ('Dumbo Octopus', 7),
>>>
>>>  ('Robot dinosaur', 9),
>>>
>>>  ('Hedgehog', 10),
>>>
>>>  ('Cuttlefish', 11),
>>>
>>>  ('Angler fish', 12),
>>>
>>>  ('Lemur', 14),
>>>
>>>  ('Owl', 15),
>>>
>>>  ('Firefly', 17)]
>>>
>>>
>>>

 On Thu, Nov 21, 2019 at 6:18 PM Vinay Mayar 
 wrote:

> [ ] Beaver
> [ ] Hedgehog
> [ ] Lemur
> [ ] Owl
> [ ] Salmon
> [ ] Trout
> [ ] Robot dinosaur
> [ ] Firefly
> [ ] Cuttlefish
> [x] Dumbo Octopus
> [ ] Angler fish
>
> On Thu, Nov 21, 2019 at 6:14 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> [X] Beaver
>> [ ] Hedgehog
>> [ ] Lemur
>> [X] Owl
>> [ ] Salmon
>> [ ] Trout
>> [ ] Robot dinosaur
>> [ ] Firefly
>> [X ] Cuttlefish
>> [X ] Dumbo Octopus
>> [ X] Angler fish
>>
>> Thanks,
>> Cham
>>
>> On Thu, Nov 21, 2019 at 1:43 PM Michał Walenia <
>> michal.wale...@polidea.com> wrote:
>>
>>> [X] Beaver
>>> [ ] Hedgehog
>>> [X] Lemur
>>> [X] Owl
>>> [ ] Salmon
>>> [ ] Trout
>>> [X] Robot dinosaur
>>> [X] Firefly
>>> [ ] Cuttlefish
>>> [ ] Dumbo Octopus
>>> [ ] Angler fish
>>>
>>> On Thu, Nov 21, 2019 at 1:11 PM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
 [ ] Beaver
 [X] Hedgehog
 [ ] Lemur
 [ ] Owl
 [ ] Salmon
 [ ] Trout
 [ ] Robot dinosaur
 [ ] Firefly
 [X] Cuttlefish
 [ ] Dumbo Octopus
 [ ] Angler fish

 On Thu, Nov 21, 2019 at 11:21 AM Robert Burke 
 wrote:

> [ X] Beaver
> [] Hedgehog
> [ x] Lemur
> [ X] Owl
> [ ] Salmon
> [ ] Trout
> [ ] Robot dinosaur
> [X ] Firefly
> [ X] Cuttlefish
> [x ] Dumbo Octopus
> [X ] Angler fish
>
> On Thu, Nov 21, 2019, 9:33 AM Łukasz Gajowy <
> lukasz.gaj...@gmail.com> wrote:
>
>> [ ] Beaver
>> [ ] Hedgehog
>> [x] Lemur
>> [x] Owl
>> [ ] Salmon
>> [ ] Trout
>> [x] Robot dinosaur!
>> [ ] Firefly
>> [ ] Cuttlefish
>> [ ] Dumbo Octopus
>> [ ] Angler fish
>>
>> czw., 21 lis 2019 o 00:44 Augustin Lafanechere <
>> augustin.lafanech...@kapten.com> napisał(a):
>>
>>> [ ] Beaver
>>> [ ] Hedgehog
>>> [ ] Lemur
>>> [ ] Owl
>>> [x] Salmon

Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-24 Thread Maximilian Michels
Load-balancing the worker selection for bundle execution sounds like the 
solution to uneven work distribution across the workers. Some comments:


(1) I could imagine that in case of long-running bundle execution (e.g. 
model execution), this could stall upstream operators because their busy 
downstream operators hold all available workers, thus also letting the 
pipeline throughput/latency suffer.


Instead of balancing across _all_ the workers available on particular 
node (aka TaskManager), it could make sense to just increase the share 
of SDK workers for a particular executable stage. At the moment, each 
stage just receives a single worker. Instead, it could receive a higher 
share of workers, which could either be exclusive or overlap with a 
share of another executable stage. Essentially, this is an extension to 
what you are proposing to ensure stages make progress.


(2) Another concern is that load balancing across multiple worker 
instances would render state caching useless. We need to make the Runner 
aware of it such that it can turn off state caching. With the approach 
of multiple workers per stage in (1), it would also be possible to keep 
the state caching, if we divided the key range across the workers.


Cheers,
Max

On 23.11.19 18:42, Thomas Weise wrote:

JIRA: https://issues.apache.org/jira/browse/BEAM-8816


On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise > wrote:


Hi Luke,

Thanks for the background and it is exciting to see the progress on
the SDF side. It will help with this use case and many other
challenges. I imagine the Python user code would be able to
determine that it is bogged down with high latency record processing
(based on the duration it actually took to process previous records)
and opt to send back remaining work to the runner.

Until the Flink runner supports reassignment of work, I'm planning
to implement the simple bundle distribution approach referred to
before. We will test it in our environment and contribute it back if
the results are good.

Thomas



On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik mailto:lc...@google.com>> wrote:

Dataflow has run into this issue as well. Dataflow has "work
items" that are converted into bundles that are executed on the
SDK. Each work item does a greedy assignment to the SDK worker
with the fewest work items assigned. As you surmised, we use SDF
splitting in batch pipelines to balance work. We would like to
use splitting of SDFs in streaming pipelines as well but
Dataflow can't handle it as of right now.

As part of a few PRs, I have added basic SDF expansion to the
shared runner lib and slowly exposed the runner side hooks[2, 3]
for SDK initiated checkpointing and bundle finalization. There
are still a few pieces left:
* exposing an API so the bundle can be split during execution
* adding the limited depth splitting logic that would add a
basic form of dynamic work rebalancing for all runners that
decide to use it

1: https://github.com/apache/beam/pull/10045
2: https://github.com/apache/beam/pull/10065
3: https://github.com/apache/beam/pull/10074

On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise mailto:t...@apache.org>> wrote:

We found a problem with uneven utilization of SDK workers
causing excessive latency with Streaming/Python/Flink.
Remember that with Python, we need to execute multiple
worker processes on a machine instead of relying on threads
in a single worker, which requires the runner to make a
decision to which worker to give a bundle for processing.

The Flink runner has knobs to influence the number of
records per bundle and the maximum duration for a bundle.
But since the runner does not understand the cost of an
individual record, it is possible that the duration of
bundles fluctuates significantly due to the skew in
processing time of individual records. And unless the bundle
size is 1, multiple expensive records could be allocated to
a single bundle before the cutoff time is reached. We notice
this with a pipeline that executes models, but there are
other use cases where the cost of individual records can
vary significantly.

Additionally, the Flink runner establishes the association
between the subtask managing an executable stage and the SDK
worker during initialization, lasting for the duration of
the job. In other words, bundles for the same executable
stage will always be sent to the same SDK worker. When the
execution time skew is tied to specific keys (stateful
processing), it further aggravates th