Actually I believe that the Beam model does say that input elements should
be immutable. If I remember correctly, the DirectRunner even validates this
in unit tests, failing tests if the input elements have been mutated.

On Tue, Oct 27, 2020 at 3:49 AM David Morávek <d...@apache.org> wrote:

> Hi Teodor,
>
> Thanks for bringing this up. This is a known, long standing "issue".
> Unfortunately there are few things we need to consider:
>
> - As you correctly noted, the *Beam model doesn't enforce immutability*
> of input / output elements, so this is the price.
> - We* can not break *existing pipelines.
> - Flink Runner needs to provide the *same guarantees as the Beam model*.
>
> There are definitely some things we can do here, to make things faster:
>
> - We can try the similar approach as HadoopIO
> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable
> types (KV, primitives, protobuf, other known internal immutable structures).
> -* If the type is immutable, we can safely reuse it.* This should cover
> most of the performance costs without breaking the guarantees Beam model
> provides.
> - We can enable registration of custom "immutable" types via pipeline
> options? (this may be an unnecessary knob, so this needs a further
> discussion)
>
> WDYT?
>
> D.
>
>
> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <teodor_spae...@riseup.net>
> wrote:
>
>> Hey!
>>
>> I'm a student at the University of Oslo, and I'm writing a master thesis
>> about the possibility of using Beam to benchmark stream processing
>> systems. An important factor in this is the overhead associated with
>> using Beam over writing code for the runner directly. [1] found that
>> there was a large overhead associated with using Beam, but did not
>> investigate where this overhead came from. I've done benchmarks and
>> confirmed the findings there, where for simple chains of identity
>> operators, Beam is 43x times slower than the Flink equivalent.
>>
>> These are very simple pipelines, with custom sources that just output a
>> series of integers. By profiling I've found that most of the overhead
>> comes from serializing and deserializing. Specifically the way
>> TypeSerializer's, [2], is implemented in [3], where each object is
>> serialized and then deserialized between every operator. Looking into
>> the semantics of Beam, no operator should change the input, so we don't
>> need to do a copy here. The function in [3] could potentially be changed
>> to a single `return` statement.
>>
>> Doing this removes 80% of the overhead in my tests. This is a very
>> synthetic example, but it's a low hanging fruit and might give a speed
>> boost to many pipelines when run on the Flink runnner. I would like to
>> make this my first contribution to Beam, but as the guide [4] says, I
>> thought I'd ask here first to see if there a is a reason not to do this.
>>
>> Only objection I can see, is that it might break existing pipelines
>> which rely on the Flink runner saving them from not following the
>> immutability guarantee. I see this as a small loss as they are relying
>> on an implementation detail of the Flink runner.
>>
>> I hope I have explained this adequately and eagerly away any feedback :)
>>
>> Best regards,
>> Teodor Spæren
>>
>> [1]: https://arxiv.org/abs/1907.08302
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>> [3]:
>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
>> [4]: https://beam.apache.org/contribute/
>>
>

Reply via email to