Hi Teodor, Thanks for bringing this up. This is a known, long standing "issue". Unfortunately there are few things we need to consider:
- As you correctly noted, the *Beam model doesn't enforce immutability* of input / output elements, so this is the price. - We* can not break *existing pipelines. - Flink Runner needs to provide the *same guarantees as the Beam model*. There are definitely some things we can do here, to make things faster: - We can try the similar approach as HadoopIO (HadoopInputFormatReader#isKnownImmutable), to check for known immutable types (KV, primitives, protobuf, other known internal immutable structures). -* If the type is immutable, we can safely reuse it.* This should cover most of the performance costs without breaking the guarantees Beam model provides. - We can enable registration of custom "immutable" types via pipeline options? (this may be an unnecessary knob, so this needs a further discussion) WDYT? D. On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <teodor_spae...@riseup.net> wrote: > Hey! > > I'm a student at the University of Oslo, and I'm writing a master thesis > about the possibility of using Beam to benchmark stream processing > systems. An important factor in this is the overhead associated with > using Beam over writing code for the runner directly. [1] found that > there was a large overhead associated with using Beam, but did not > investigate where this overhead came from. I've done benchmarks and > confirmed the findings there, where for simple chains of identity > operators, Beam is 43x times slower than the Flink equivalent. > > These are very simple pipelines, with custom sources that just output a > series of integers. By profiling I've found that most of the overhead > comes from serializing and deserializing. Specifically the way > TypeSerializer's, [2], is implemented in [3], where each object is > serialized and then deserialized between every operator. Looking into > the semantics of Beam, no operator should change the input, so we don't > need to do a copy here. The function in [3] could potentially be changed > to a single `return` statement. > > Doing this removes 80% of the overhead in my tests. This is a very > synthetic example, but it's a low hanging fruit and might give a speed > boost to many pipelines when run on the Flink runnner. I would like to > make this my first contribution to Beam, but as the guide [4] says, I > thought I'd ask here first to see if there a is a reason not to do this. > > Only objection I can see, is that it might break existing pipelines > which rely on the Flink runner saving them from not following the > immutability guarantee. I see this as a small loss as they are relying > on an implementation detail of the Flink runner. > > I hope I have explained this adequately and eagerly away any feedback :) > > Best regards, > Teodor Spæren > > [1]: https://arxiv.org/abs/1907.08302 > [2]: > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java > [3]: > https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84 > [4]: https://beam.apache.org/contribute/ >