Hi Teodor and Max,

I think that there is not 100% need for all runners to behave exactly the same way. The reason for that is that different runners can have different purposes. The purpose of DirectRunner is to verify code of the pipeline and (if it succeeds) to validate that it will (hopefully :)) run on all other possible runners. Now, because of that, runner can have a different policy about what is opt-in and what is opt-out. DirectRunner will tend to be the most strict from all runners, just because it wants to ensure that the pipeline will run fine on the biggest subset of possible runners.

On the other hand, user is not obliged to test his pipeline on DirectRunner (he might choose local FlinkRunner) and so it seems valid to have FlinkRunner opt-out from the copy. FlinkRunner's purpose is to run the user code no matter what (and because of how it is constructed, it is difficult to do the same check as DirectRunner does). Therefore, I'd suggest we do the following:

 a) add the mentioned opt-out parameter for the copy

 b) if the parameter is not set (all existing pipelines) log a performance warning, that the user should 1) verify that the pipeline doesn't modify input and output objects and validate his pipeline against DirectRunner and 2) if it passes, add the opt-out parameter for performance benefit

WDYT?

 Jan

On 10/29/20 8:38 AM, Teodor Spæren wrote:
Ok then we are on the same page, but I disagree with your conclusion. The reason Flink has to do the deep copy is that it doesn't state that the inputs are immutable and should not be changed, and so have to do the deep copy. In Beam, the user is not supposed to modify the input collection and if they do, it's undefined behavior. This is the reason the DirectRunner checks for this, to make sure the users are not relying on it.

I said it in an email yesterday, that I changed the flink runner to just return the input and all tests passed. I still think an option opt-in is
the right way to go, to not break existing pipelines.

Best regards,
Teodor Spæren

On Wed, Oct 28, 2020 at 07:29:06PM +0100, Maximilian Michels wrote:
You are right that Flink serializers do not care to copy for immutable Java types, e.g. Long, Integer, String. However, Pojos or other custom types can be mutated and Flink does a deep copy in this case.

If you look at the PojoSerializer in Flink, you will see that it does a deep copy: https://github.com/apache/flink/blob/d13f66be552eac89f45469c199ae036087baa38d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L228

Also Flink uses Java serialization if the generic Kryo serializer fails: https://github.com/apache/flink/blob/d13f66be552eac89f45469c199ae036087baa38d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L251

In Beam we are just wrapping around Beam coders, so we do not know if a type is mutable or not. This is why we always deep-copy. I'm not sure that the change to always return the input would be safe. However, we could add some exceptions for Beam types we are sure cannot be mutated. Also, a pipeline option is ok if it is opt-in.

-Max

On 28.10.20 11:59, Teodor Spæren wrote:
Hey Max!

Just to make sure we are on the same page:

When you say "Flinks default behavior" do you mean Apache Flink the project or Beams Flink Runner? I'm assuming the Flink Runner, since Apache Flink leaves it up to the TypeSerializer to decide how to copy and none of the ones I've seen so far choose to do it through a serialization then deserialization method.

Is the bug hard to detect? Using the direct runner will warn of this missuse by default, without any need to change the pipeline itself, as far as I know? Please correct me if I'm wrong, I don't have much experience with Beam!

PCollections being immutable does mean that the input element should not be modified, rather if a modification is needed it is up to the user to copy the input before changing it [1](3.2.3 Immutability). I think this is what you are saying, but I just wanted to make sure :)

Also, I think naming the flag anything with object reuse would be confusing to users, as flink already has the concept of object reuse [2](enableObjectReuse), and there is an option on the runner mentioning this already[3](objectReuse). I'm thinking something more along the lines of "fasterCopy" or "disableFailsafeCopying".

Best regards,
Teodor Spæren

[1]: https://beam.apache.org/documentation/programming-guide/#pcollection-characteristics

[2]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

[3]: https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner


On Wed, Oct 28, 2020 at 10:31:51AM +0100, Maximilian Michels wrote:
Very good observation @Teodor!

Flink's default behavior is to copy elements by going through a serialization/deserialization roundtrip. This occurs regardless of whether operatores are "chained" (directly pass on data without going through the network) or not.

This default was chosen for a reason because users tend to forget that you must not modify the input. It can cause very hard to detect "bugs".

PCollections are immutable but that does not mean transforms are not allowed to mutate inputs. Rather it means that the original element will not change but a copy of that element.

+1 for a flag to enable object reuse
-1 for changing the default for the above reason

Cheers,
Max

On 27.10.20 21:54, Kenneth Knowles wrote:
It seems that many correct things are said on this thread.

1. Elements of a PCollection are immutable. They should be like mathematical values. 2. For performance reasons, the author of a DoFn is responsible to not mutate input elements and also to not mutate outputs once they have been output.
3. The direct runner does extra work to check if your DoFn* is wrong.
4. On a production runner it is expected that serialization only occurs when needed for shipping data**

If the FlinkRunner is serializing things that don't have to be shipped that seems like a great easy win.

Kenn

*notably CombineFn has an API that is broken; only the first accumulator is allowed to be mutated and a runner is responsible for cloning it as necessary; it is expected that combining many elements will execute by mutating one unaliased accumulator many times **small caveat that when doing in-memory groupings you need to use Coder#structuralValue and group by that, which may serialize but hopefully does something smarter

On Tue, Oct 27, 2020 at 8:52 AM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

   Actually I believe that the Beam model does say that input elements    should be immutable. If I remember correctly, the DirectRunner even
   validates this in unit tests, failing tests if the input elements
   have been mutated.

   On Tue, Oct 27, 2020 at 3:49 AM David Morávek <d...@apache.org
   <mailto:d...@apache.org>> wrote:

       Hi Teodor,

       Thanks for bringing this up. This is a known, long standing
       "issue". Unfortunately there are few things we need to consider:

       - As you correctly noted, the *Beam model doesn't enforce
       immutability* of input / output elements, so this is the price.
       - We*can not break *existing pipelines.
       - Flink Runner needs to provide the *same guarantees as the Beam
       model*.

       There are definitely some things we can do here, to make things
       faster:

       - We can try the similar approach as HadoopIO
       (HadoopInputFormatReader#isKnownImmutable), to check for known
       immutable types (KV, primitives, protobuf, other known internal
       immutable structures).
       -*If the type is immutable, we can safely reuse it.* This should
       cover most of the performance costs without breaking the
       guarantees Beam model provides.
       - We can enable registration of custom "immutable" types via
       pipeline options? (this may be an unnecessary knob, so this
       needs a further discussion)

       WDYT?

       D.


       On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren
       <teodor_spae...@riseup.net <mailto:teodor_spae...@riseup.net>>
       wrote:

           Hey!

           I'm a student at the University of Oslo, and I'm writing a
           master thesis
           about the possibility of using Beam to benchmark stream
           processing
           systems. An important factor in this is the overhead
           associated with
           using Beam over writing code for the runner directly. [1]
           found that
           there was a large overhead associated with using Beam, but
           did not
           investigate where this overhead came from. I've done
           benchmarks and
           confirmed the findings there, where for simple chains of
           identity
           operators, Beam is 43x times slower than the Flink equivalent.

           These are very simple pipelines, with custom sources that
           just output a
           series of integers. By profiling I've found that most of the
           overhead
           comes from serializing and deserializing. Specifically the way
           TypeSerializer's, [2], is implemented in [3], where each
           object is
           serialized and then deserialized between every operator.
           Looking into
           the semantics of Beam, no operator should change the input,
           so we don't
           need to do a copy here. The function in [3] could
           potentially be changed
           to a single `return` statement.

           Doing this removes 80% of the overhead in my tests. This is
           a very
           synthetic example, but it's a low hanging fruit and might
           give a speed
           boost to many pipelines when run on the Flink runnner. I
           would like to
           make this my first contribution to Beam, but as the guide
           [4] says, I
           thought I'd ask here first to see if there a is a reason not
           to do this.

           Only objection I can see, is that it might break existing
           pipelines
           which rely on the Flink runner saving them from not
           following the
           immutability guarantee. I see this as a small loss as they
           are relying
           on an implementation detail of the Flink runner.

           I hope I have explained this adequately and eagerly away any
           feedback :)

           Best regards,
           Teodor Spæren

           [1]: https://arxiv.org/abs/1907.08302
           <https://arxiv.org/abs/1907.08302>
           [2]:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java

<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java>

           [3]:
https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84

<https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84>

           [4]: https://beam.apache.org/contribute/
           <https://beam.apache.org/contribute/>

Reply via email to