You need to send an email to: [email protected] To stop receiving the messages
On Tue, Nov 3, 2020 at 10:52 AM Manuela Chamda Tchakoute <[email protected]> wrote: > > Hi, > Please can some one remove me from this chat? > > On Oct 27, 2020 11:49 AM, "David Morávek" <[email protected]> wrote: >> >> Hi Teodor, >> >> Thanks for bringing this up. This is a known, long standing "issue". >> Unfortunately there are few things we need to consider: >> >> - As you correctly noted, the Beam model doesn't enforce immutability of >> input / output elements, so this is the price. >> - We can not break existing pipelines. >> - Flink Runner needs to provide the same guarantees as the Beam model. >> >> There are definitely some things we can do here, to make things faster: >> >> - We can try the similar approach as HadoopIO >> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable >> types (KV, primitives, protobuf, other known internal immutable structures). >> - If the type is immutable, we can safely reuse it. This should cover most >> of the performance costs without breaking the guarantees Beam model provides. >> - We can enable registration of custom "immutable" types via pipeline >> options? (this may be an unnecessary knob, so this needs a further >> discussion) >> >> WDYT? >> >> D. >> >> >> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <[email protected]> >> wrote: >>> >>> Hey! >>> >>> I'm a student at the University of Oslo, and I'm writing a master thesis >>> about the possibility of using Beam to benchmark stream processing >>> systems. An important factor in this is the overhead associated with >>> using Beam over writing code for the runner directly. [1] found that >>> there was a large overhead associated with using Beam, but did not >>> investigate where this overhead came from. I've done benchmarks and >>> confirmed the findings there, where for simple chains of identity >>> operators, Beam is 43x times slower than the Flink equivalent. >>> >>> These are very simple pipelines, with custom sources that just output a >>> series of integers. By profiling I've found that most of the overhead >>> comes from serializing and deserializing. Specifically the way >>> TypeSerializer's, [2], is implemented in [3], where each object is >>> serialized and then deserialized between every operator. Looking into >>> the semantics of Beam, no operator should change the input, so we don't >>> need to do a copy here. The function in [3] could potentially be changed >>> to a single `return` statement. >>> >>> Doing this removes 80% of the overhead in my tests. This is a very >>> synthetic example, but it's a low hanging fruit and might give a speed >>> boost to many pipelines when run on the Flink runnner. I would like to >>> make this my first contribution to Beam, but as the guide [4] says, I >>> thought I'd ask here first to see if there a is a reason not to do this. >>> >>> Only objection I can see, is that it might break existing pipelines >>> which rely on the Flink runner saving them from not following the >>> immutability guarantee. I see this as a small loss as they are relying >>> on an implementation detail of the Flink runner. >>> >>> I hope I have explained this adequately and eagerly away any feedback :) >>> >>> Best regards, >>> Teodor Spæren >>> >>> [1]: https://arxiv.org/abs/1907.08302 >>> [2]: >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java >>> [3]: >>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84 >>> [4]: https://beam.apache.org/contribute/
