Hi Alexey, I think it should be possible to optimise this particular transformation by using a split/select pattern in Flink. (See split and select here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#datastream-transformations). The current implementation is not very optimised, my main goal was to make all features of Beam work before going into individual optimisations.
About object-reuse in Flink Streaming: this is a bit of a dangerous setting and can lead to unexpected results with certain patterns. I think your stack trace is not complete, at least I can't seem to see the root exception. Cheers, Aljoscha On Mon, 28 Nov 2016 at 07:33 Alexey Demin <[email protected]> wrote: > Hi > > If we try use sideOutput with TupleTag and flink config enableObjectReuse > then we have stacktrace > > at > > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234) > at > > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118) > at > > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104) > at > > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106) > at > > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > > *org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)* > at > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566) > at > > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315) > * at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)* > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459) > at > > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270) > at > > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412) > at > > org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406) > > Most interesting parts it's CopyingBroadcastingOutputCollector and > 1.flatMap(FlinkStreamingTransformTranslators.java:570) > > For flat map: > > @SuppressWarnings("unchecked") > DataStream filtered = > unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, > Object>() { > @Override > public void flatMap(RawUnionValue value, Collector<Object> > out) throws Exception { > if (value.getUnionTag() == outputTag) { > out.collect(value.getValue()); > } > } > }).returns(outputTypeInfo); > > as result we always serialize-deserialize messages even if we have only 1 > collector for each type of tags > > CopyingBroadcastingOutputCollector also have path when we have only one > collector on output > > public void collect(StreamRecord<T> record) { > for (int i = 0; i < outputs.length - 1; i++) { > Output<StreamRecord<T>> output = > outputs[i]; > StreamRecord<T> shallowCopy = > record.copy(record.getValue()); > output.collect(shallowCopy); > } > > // don't copy for the last output > outputs[outputs.length - 1].collect(record); > } > > Maybe more efficiently do tag filtering before > CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy > data ? > > > Thanks > Alexey Diomin >
