It seems that many correct things are said on this thread. 1. Elements of a PCollection are immutable. They should be like mathematical values. 2. For performance reasons, the author of a DoFn is responsible to not mutate input elements and also to not mutate outputs once they have been output. 3. The direct runner does extra work to check if your DoFn* is wrong. 4. On a production runner it is expected that serialization only occurs when needed for shipping data**
If the FlinkRunner is serializing things that don't have to be shipped that seems like a great easy win. Kenn *notably CombineFn has an API that is broken; only the first accumulator is allowed to be mutated and a runner is responsible for cloning it as necessary; it is expected that combining many elements will execute by mutating one unaliased accumulator many times **small caveat that when doing in-memory groupings you need to use Coder#structuralValue and group by that, which may serialize but hopefully does something smarter On Tue, Oct 27, 2020 at 8:52 AM Reuven Lax <re...@google.com> wrote: > Actually I believe that the Beam model does say that input elements should > be immutable. If I remember correctly, the DirectRunner even validates this > in unit tests, failing tests if the input elements have been mutated. > > On Tue, Oct 27, 2020 at 3:49 AM David Morávek <d...@apache.org> wrote: > >> Hi Teodor, >> >> Thanks for bringing this up. This is a known, long standing "issue". >> Unfortunately there are few things we need to consider: >> >> - As you correctly noted, the *Beam model doesn't enforce immutability* >> of input / output elements, so this is the price. >> - We* can not break *existing pipelines. >> - Flink Runner needs to provide the *same guarantees as the Beam model*. >> >> There are definitely some things we can do here, to make things faster: >> >> - We can try the similar approach as HadoopIO >> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable >> types (KV, primitives, protobuf, other known internal immutable structures). >> -* If the type is immutable, we can safely reuse it.* This should cover >> most of the performance costs without breaking the guarantees Beam model >> provides. >> - We can enable registration of custom "immutable" types via pipeline >> options? (this may be an unnecessary knob, so this needs a further >> discussion) >> >> WDYT? >> >> D. >> >> >> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <teodor_spae...@riseup.net> >> wrote: >> >>> Hey! >>> >>> I'm a student at the University of Oslo, and I'm writing a master thesis >>> about the possibility of using Beam to benchmark stream processing >>> systems. An important factor in this is the overhead associated with >>> using Beam over writing code for the runner directly. [1] found that >>> there was a large overhead associated with using Beam, but did not >>> investigate where this overhead came from. I've done benchmarks and >>> confirmed the findings there, where for simple chains of identity >>> operators, Beam is 43x times slower than the Flink equivalent. >>> >>> These are very simple pipelines, with custom sources that just output a >>> series of integers. By profiling I've found that most of the overhead >>> comes from serializing and deserializing. Specifically the way >>> TypeSerializer's, [2], is implemented in [3], where each object is >>> serialized and then deserialized between every operator. Looking into >>> the semantics of Beam, no operator should change the input, so we don't >>> need to do a copy here. The function in [3] could potentially be changed >>> to a single `return` statement. >>> >>> Doing this removes 80% of the overhead in my tests. This is a very >>> synthetic example, but it's a low hanging fruit and might give a speed >>> boost to many pipelines when run on the Flink runnner. I would like to >>> make this my first contribution to Beam, but as the guide [4] says, I >>> thought I'd ask here first to see if there a is a reason not to do this. >>> >>> Only objection I can see, is that it might break existing pipelines >>> which rely on the Flink runner saving them from not following the >>> immutability guarantee. I see this as a small loss as they are relying >>> on an implementation detail of the Flink runner. >>> >>> I hope I have explained this adequately and eagerly away any feedback :) >>> >>> Best regards, >>> Teodor Spæren >>> >>> [1]: https://arxiv.org/abs/1907.08302 >>> [2]: >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java >>> [3]: >>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84 >>> [4]: https://beam.apache.org/contribute/ >>> >>