Actually I believe that the Beam model does say that input elements should be immutable. If I remember correctly, the DirectRunner even validates this in unit tests, failing tests if the input elements have been mutated.
On Tue, Oct 27, 2020 at 3:49 AM David Morávek <d...@apache.org> wrote: > Hi Teodor, > > Thanks for bringing this up. This is a known, long standing "issue". > Unfortunately there are few things we need to consider: > > - As you correctly noted, the *Beam model doesn't enforce immutability* > of input / output elements, so this is the price. > - We* can not break *existing pipelines. > - Flink Runner needs to provide the *same guarantees as the Beam model*. > > There are definitely some things we can do here, to make things faster: > > - We can try the similar approach as HadoopIO > (HadoopInputFormatReader#isKnownImmutable), to check for known immutable > types (KV, primitives, protobuf, other known internal immutable structures). > -* If the type is immutable, we can safely reuse it.* This should cover > most of the performance costs without breaking the guarantees Beam model > provides. > - We can enable registration of custom "immutable" types via pipeline > options? (this may be an unnecessary knob, so this needs a further > discussion) > > WDYT? > > D. > > > On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <teodor_spae...@riseup.net> > wrote: > >> Hey! >> >> I'm a student at the University of Oslo, and I'm writing a master thesis >> about the possibility of using Beam to benchmark stream processing >> systems. An important factor in this is the overhead associated with >> using Beam over writing code for the runner directly. [1] found that >> there was a large overhead associated with using Beam, but did not >> investigate where this overhead came from. I've done benchmarks and >> confirmed the findings there, where for simple chains of identity >> operators, Beam is 43x times slower than the Flink equivalent. >> >> These are very simple pipelines, with custom sources that just output a >> series of integers. By profiling I've found that most of the overhead >> comes from serializing and deserializing. Specifically the way >> TypeSerializer's, [2], is implemented in [3], where each object is >> serialized and then deserialized between every operator. Looking into >> the semantics of Beam, no operator should change the input, so we don't >> need to do a copy here. The function in [3] could potentially be changed >> to a single `return` statement. >> >> Doing this removes 80% of the overhead in my tests. This is a very >> synthetic example, but it's a low hanging fruit and might give a speed >> boost to many pipelines when run on the Flink runnner. I would like to >> make this my first contribution to Beam, but as the guide [4] says, I >> thought I'd ask here first to see if there a is a reason not to do this. >> >> Only objection I can see, is that it might break existing pipelines >> which rely on the Flink runner saving them from not following the >> immutability guarantee. I see this as a small loss as they are relying >> on an implementation detail of the Flink runner. >> >> I hope I have explained this adequately and eagerly away any feedback :) >> >> Best regards, >> Teodor Spæren >> >> [1]: https://arxiv.org/abs/1907.08302 >> [2]: >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java >> [3]: >> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84 >> [4]: https://beam.apache.org/contribute/ >> >