Hey Max!
Just to make sure we are on the same page:
When you say "Flinks default behavior" do you mean Apache Flink the
project or Beams Flink Runner? I'm assuming the Flink Runner, since
Apache Flink leaves it up to the TypeSerializer to decide how to copy
and none of the ones I've seen so far choose to do it through a
serialization then deserialization method.
Is the bug hard to detect? Using the direct runner will warn of this
missuse by default, without any need to change the pipeline itself, as
far as I know? Please correct me if I'm wrong, I don't have much
experience with Beam!
PCollections being immutable does mean that the input element should not
be modified, rather if a modification is needed it is up to the user to
copy the input before changing it [1](3.2.3 Immutability). I think
this is what you are saying, but I just wanted to make sure :)
Also, I think naming the flag anything with object reuse would be
confusing to users, as flink already has the concept of object reuse
[2](enableObjectReuse), and there is an option on the runner mentioning
this already[3](objectReuse). I'm thinking something more along the
lines of "fasterCopy" or "disableFailsafeCopying".
Best regards,
Teodor Spæren
[1]:
https://beam.apache.org/documentation/programming-guide/#pcollection-characteristics
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[3]:
https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
On Wed, Oct 28, 2020 at 10:31:51AM +0100, Maximilian Michels wrote:
Very good observation @Teodor!
Flink's default behavior is to copy elements by going through a
serialization/deserialization roundtrip. This occurs regardless of
whether operatores are "chained" (directly pass on data without going
through the network) or not.
This default was chosen for a reason because users tend to forget that
you must not modify the input. It can cause very hard to detect
"bugs".
PCollections are immutable but that does not mean transforms are not
allowed to mutate inputs. Rather it means that the original element
will not change but a copy of that element.
+1 for a flag to enable object reuse
-1 for changing the default for the above reason
Cheers,
Max
On 27.10.20 21:54, Kenneth Knowles wrote:
It seems that many correct things are said on this thread.
1. Elements of a PCollection are immutable. They should be like
mathematical values.
2. For performance reasons, the author of a DoFn is responsible to
not mutate input elements and also to not mutate outputs once they
have been output.
3. The direct runner does extra work to check if your DoFn* is wrong.
4. On a production runner it is expected that serialization only
occurs when needed for shipping data**
If the FlinkRunner is serializing things that don't have to be
shipped that seems like a great easy win.
Kenn
*notably CombineFn has an API that is broken; only the first
accumulator is allowed to be mutated and a runner is responsible for
cloning it as necessary; it is expected that combining many elements
will execute by mutating one unaliased accumulator many times
**small caveat that when doing in-memory groupings you need to use
Coder#structuralValue and group by that, which may serialize but
hopefully does something smarter
On Tue, Oct 27, 2020 at 8:52 AM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
Actually I believe that the Beam model does say that input elements
should be immutable. If I remember correctly, the DirectRunner even
validates this in unit tests, failing tests if the input elements
have been mutated.
On Tue, Oct 27, 2020 at 3:49 AM David Morávek <[email protected]
<mailto:[email protected]>> wrote:
Hi Teodor,
Thanks for bringing this up. This is a known, long standing
"issue". Unfortunately there are few things we need to consider:
- As you correctly noted, the *Beam model doesn't enforce
immutability* of input / output elements, so this is the price.
- We*can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam
model*.
There are definitely some things we can do here, to make things
faster:
- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known
immutable types (KV, primitives, protobuf, other known internal
immutable structures).
-*If the type is immutable, we can safely reuse it.* This should
cover most of the performance costs without breaking the
guarantees Beam model provides.
- We can enable registration of custom "immutable" types via
pipeline options? (this may be an unnecessary knob, so this
needs a further discussion)
WDYT?
D.
On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren
<[email protected] <mailto:[email protected]>>
wrote:
Hey!
I'm a student at the University of Oslo, and I'm writing a
master thesis
about the possibility of using Beam to benchmark stream
processing
systems. An important factor in this is the overhead
associated with
using Beam over writing code for the runner directly. [1]
found that
there was a large overhead associated with using Beam, but
did not
investigate where this overhead came from. I've done
benchmarks and
confirmed the findings there, where for simple chains of
identity
operators, Beam is 43x times slower than the Flink equivalent.
These are very simple pipelines, with custom sources that
just output a
series of integers. By profiling I've found that most of the
overhead
comes from serializing and deserializing. Specifically the way
TypeSerializer's, [2], is implemented in [3], where each
object is
serialized and then deserialized between every operator.
Looking into
the semantics of Beam, no operator should change the input,
so we don't
need to do a copy here. The function in [3] could
potentially be changed
to a single `return` statement.
Doing this removes 80% of the overhead in my tests. This is
a very
synthetic example, but it's a low hanging fruit and might
give a speed
boost to many pipelines when run on the Flink runnner. I
would like to
make this my first contribution to Beam, but as the guide
[4] says, I
thought I'd ask here first to see if there a is a reason not
to do this.
Only objection I can see, is that it might break existing
pipelines
which rely on the Flink runner saving them from not
following the
immutability guarantee. I see this as a small loss as they
are relying
on an implementation detail of the Flink runner.
I hope I have explained this adequately and eagerly away any
feedback :)
Best regards,
Teodor Spæren
[1]: https://arxiv.org/abs/1907.08302
<https://arxiv.org/abs/1907.08302>
[2]:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java>
[3]:
https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
<https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84>
[4]: https://beam.apache.org/contribute/
<https://beam.apache.org/contribute/>