Some more thoughts:
As it says on the DirectRunner [1] page, the DirectRunner is meant to
check that users don't rely on semantics that are not guaranteed by the
Beam model.
Programs that rely on the Flink runner deep cloning the inputs between
each operator in the pipeline is relying on a semantic that is not
guaranteed by the Beam model, and those pipelines would fail if ran on
the DirectRunner.
As I stated in the previous email, I have some example programs that
return different outputs on the Flink runner and on the DirectRunner. I
have not tested these programs on other runners, so I don't know what
they would return. If they return different answers than the
DirectRunner, I'm inclined to say that the DirectRunner should either be
changed, or the runners be changed.
From my very limited point of view, the Flink runner seems to be
spending a lot of extra time implementing a semantic guarantee that the
Beam model explicitly doesn't support.
Best regards,
Teodor Spæren
[1]: https://beam.apache.org/documentation/runners/direct/
On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
Hey David,
I think I might have worded this poorly, because what I meant is that
from what I can see in [1], the BEAM model explicitly states that
PCollections should be treated as immutable. The direct runner also
tests for this. Do the other runners also protect the user from
misusing the system so? If not we have a situation where running the
same pipeline on two different runners will yield different answers. I
can show some examples that return different examples for the Flink
and the Direct Runner.
I agree that a breaking existing pipelines is a no-no, but I do think
that we could simply gate this behind an option on the Flink runner.
I also tried to search for this before, but did not find any mention
of it, can you link me to some discussions about this in the past?
Thanks for reply :D
Best regards,
Teodor Spæren
[1]: https://beam.apache.org/documentation/programming-guide/#immutability
On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
Hi Teodor,
Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:
- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.
There are definitely some things we can do here, to make things faster:
- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)
WDYT?
D.
On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <[email protected]>
wrote:
Hey!
I'm a student at the University of Oslo, and I'm writing a master thesis
about the possibility of using Beam to benchmark stream processing
systems. An important factor in this is the overhead associated with
using Beam over writing code for the runner directly. [1] found that
there was a large overhead associated with using Beam, but did not
investigate where this overhead came from. I've done benchmarks and
confirmed the findings there, where for simple chains of identity
operators, Beam is 43x times slower than the Flink equivalent.
These are very simple pipelines, with custom sources that just output a
series of integers. By profiling I've found that most of the overhead
comes from serializing and deserializing. Specifically the way
TypeSerializer's, [2], is implemented in [3], where each object is
serialized and then deserialized between every operator. Looking into
the semantics of Beam, no operator should change the input, so we don't
need to do a copy here. The function in [3] could potentially be changed
to a single `return` statement.
Doing this removes 80% of the overhead in my tests. This is a very
synthetic example, but it's a low hanging fruit and might give a speed
boost to many pipelines when run on the Flink runnner. I would like to
make this my first contribution to Beam, but as the guide [4] says, I
thought I'd ask here first to see if there a is a reason not to do this.
Only objection I can see, is that it might break existing pipelines
which rely on the Flink runner saving them from not following the
immutability guarantee. I see this as a small loss as they are relying
on an implementation detail of the Flink runner.
I hope I have explained this adequately and eagerly away any feedback :)
Best regards,
Teodor Spæren
[1]: https://arxiv.org/abs/1907.08302
[2]:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
[3]:
https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
[4]: https://beam.apache.org/contribute/