Hi Teodor,

Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:

- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.

There are definitely some things we can do here, to make things faster:

- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)

WDYT?

D.


On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <teodor_spae...@riseup.net>
wrote:

> Hey!
>
> I'm a student at the University of Oslo, and I'm writing a master thesis
> about the possibility of using Beam to benchmark stream processing
> systems. An important factor in this is the overhead associated with
> using Beam over writing code for the runner directly. [1] found that
> there was a large overhead associated with using Beam, but did not
> investigate where this overhead came from. I've done benchmarks and
> confirmed the findings there, where for simple chains of identity
> operators, Beam is 43x times slower than the Flink equivalent.
>
> These are very simple pipelines, with custom sources that just output a
> series of integers. By profiling I've found that most of the overhead
> comes from serializing and deserializing. Specifically the way
> TypeSerializer's, [2], is implemented in [3], where each object is
> serialized and then deserialized between every operator. Looking into
> the semantics of Beam, no operator should change the input, so we don't
> need to do a copy here. The function in [3] could potentially be changed
> to a single `return` statement.
>
> Doing this removes 80% of the overhead in my tests. This is a very
> synthetic example, but it's a low hanging fruit and might give a speed
> boost to many pipelines when run on the Flink runnner. I would like to
> make this my first contribution to Beam, but as the guide [4] says, I
> thought I'd ask here first to see if there a is a reason not to do this.
>
> Only objection I can see, is that it might break existing pipelines
> which rely on the Flink runner saving them from not following the
> immutability guarantee. I see this as a small loss as they are relying
> on an implementation detail of the Flink runner.
>
> I hope I have explained this adequately and eagerly away any feedback :)
>
> Best regards,
> Teodor Spæren
>
> [1]: https://arxiv.org/abs/1907.08302
> [2]:
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
> [3]:
> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
> [4]: https://beam.apache.org/contribute/
>

Reply via email to