Hi Alexey,
I think it should be possible to optimise this particular transformation by
using a split/select pattern in Flink. (See split and select here:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#datastream-transformations).
The current implementation is not very optimised, my main goal was to make
all features of Beam work before going into individual optimisations.

About object-reuse in Flink Streaming: this is a bit of a dangerous setting
and can lead to unexpected results with certain patterns. I think your
stack trace is not complete, at least I can't seem to see the root
exception.

Cheers,
Aljoscha

On Mon, 28 Nov 2016 at 07:33 Alexey Demin <[email protected]> wrote:

> Hi
>
> If we try use sideOutput with TupleTag and flink config enableObjectReuse
> then we have stacktrace
>
>         at
>
> org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104)
>         at
>
> org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106)
>         at
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> *org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
>         at
>
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566)
>         at
>
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
> *        at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
>         at
>
> org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406)
>
> Most interesting parts it's CopyingBroadcastingOutputCollector and
> 1.flatMap(FlinkStreamingTransformTranslators.java:570)
>
> For flat map:
>
>         @SuppressWarnings("unchecked")
>         DataStream filtered =
>             unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
> Object>() {
>               @Override
>               public void flatMap(RawUnionValue value, Collector<Object>
> out) throws Exception {
>                 if (value.getUnionTag() == outputTag) {
>                   out.collect(value.getValue());
>                 }
>               }
>             }).returns(outputTypeInfo);
>
> as result we always serialize-deserialize messages even if we have only 1
> collector for each type of tags
>
> CopyingBroadcastingOutputCollector also have path when we have only one
> collector on output
>
>              public void collect(StreamRecord<T> record) {
>                         for (int i = 0; i < outputs.length - 1; i++) {
>                                 Output<StreamRecord<T>> output =
> outputs[i];
>                                 StreamRecord<T> shallowCopy =
> record.copy(record.getValue());
>                                 output.collect(shallowCopy);
>                         }
>
>                         // don't copy for the last output
>                         outputs[outputs.length - 1].collect(record);
>                 }
>
> Maybe more efficiently do tag filtering before
> CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy
> data ?
>
>
> Thanks
> Alexey Diomin
>

Reply via email to