You need to send an email to: [email protected]
To stop receiving the messages

On Tue, Nov 3, 2020 at 10:52 AM Manuela Chamda Tchakoute
<[email protected]> wrote:
>
> Hi,
> Please can some one remove me from this chat?
>
> On Oct 27, 2020 11:49 AM, "David Morávek" <[email protected]> wrote:
>>
>> Hi Teodor,
>>
>> Thanks for bringing this up. This is a known, long standing "issue". 
>> Unfortunately there are few things we need to consider:
>>
>> - As you correctly noted, the Beam model doesn't enforce immutability of 
>> input / output elements, so this is the price.
>> - We can not break existing pipelines.
>> - Flink Runner needs to provide the same guarantees as the Beam model.
>>
>> There are definitely some things we can do here, to make things faster:
>>
>> - We can try the similar approach as HadoopIO 
>> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable 
>> types (KV, primitives, protobuf, other known internal immutable structures).
>> - If the type is immutable, we can safely reuse it. This should cover most 
>> of the performance costs without breaking the guarantees Beam model provides.
>> - We can enable registration of custom "immutable" types via pipeline 
>> options? (this may be an unnecessary knob, so this needs a further 
>> discussion)
>>
>> WDYT?
>>
>> D.
>>
>>
>> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <[email protected]> 
>> wrote:
>>>
>>> Hey!
>>>
>>> I'm a student at the University of Oslo, and I'm writing a master thesis
>>> about the possibility of using Beam to benchmark stream processing
>>> systems. An important factor in this is the overhead associated with
>>> using Beam over writing code for the runner directly. [1] found that
>>> there was a large overhead associated with using Beam, but did not
>>> investigate where this overhead came from. I've done benchmarks and
>>> confirmed the findings there, where for simple chains of identity
>>> operators, Beam is 43x times slower than the Flink equivalent.
>>>
>>> These are very simple pipelines, with custom sources that just output a
>>> series of integers. By profiling I've found that most of the overhead
>>> comes from serializing and deserializing. Specifically the way
>>> TypeSerializer's, [2], is implemented in [3], where each object is
>>> serialized and then deserialized between every operator. Looking into
>>> the semantics of Beam, no operator should change the input, so we don't
>>> need to do a copy here. The function in [3] could potentially be changed
>>> to a single `return` statement.
>>>
>>> Doing this removes 80% of the overhead in my tests. This is a very
>>> synthetic example, but it's a low hanging fruit and might give a speed
>>> boost to many pipelines when run on the Flink runnner. I would like to
>>> make this my first contribution to Beam, but as the guide [4] says, I
>>> thought I'd ask here first to see if there a is a reason not to do this.
>>>
>>> Only objection I can see, is that it might break existing pipelines
>>> which rely on the Flink runner saving them from not following the
>>> immutability guarantee. I see this as a small loss as they are relying
>>> on an implementation detail of the Flink runner.
>>>
>>> I hope I have explained this adequately and eagerly away any feedback :)
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>> [1]: https://arxiv.org/abs/1907.08302
>>> [2]: 
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>>> [3]: 
>>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
>>> [4]: https://beam.apache.org/contribute/

Reply via email to