Hi very simple example https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931
Sometime we need meta-information about processing element If i correctly understood code in FlinkStreamingTransformTranslators line 557: main problem not in translators, but in flink runtime, which don't know about tags and simple does broadcast when have 2 output from one transformation Correct me if I mistaken >> this is a bit of a dangerous setting I know about dangerous with object-reuse, but we never use object after collect. In some cases we need more performance and serialization on every transformation very expensive, but try merge all business logic in one DoFn it to make processing unsupportable. >> I think your stack trace is not complete, at least I can't seem to see the root exception. We made this stacktrace on live system with jstack. It's not exception. Thanks, Alexey Diomin 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > Hi Alexey, > I think it should be possible to optimise this particular transformation by > using a split/select pattern in Flink. (See split and select here: > https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/datastream_api.html#datastream-transformations). > The current implementation is not very optimised, my main goal was to make > all features of Beam work before going into individual optimisations. > > About object-reuse in Flink Streaming: this is a bit of a dangerous setting > and can lead to unexpected results with certain patterns. I think your > stack trace is not complete, at least I can't seem to see the root > exception. > > Cheers, > Aljoscha > > On Mon, 28 Nov 2016 at 07:33 Alexey Demin <diomi...@gmail.com> wrote: > > > Hi > > > > If we try use sideOutput with TupleTag and flink config enableObjectReuse > > then we have stacktrace > > > > at > > > > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter. > processElement(DoFnAdapters.java:234) > > at > > > > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement( > SimpleOldDoFnRunner.java:118) > > at > > > > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement( > SimpleOldDoFnRunner.java:104) > > at > > > > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement( > PushbackSideInputDoFnRunner.java:106) > > at > > > > org.apache.beam.runners.flink.translation.wrappers. > streaming.DoFnOperator.processElement(DoFnOperator.java:265) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.collect(OperatorChain.java:330) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.collect(OperatorChain.java:315) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:346) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:329) > > at > > > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector.java:51) > > at > > > > *org.apache.beam.runners.flink.translation. > FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTransl > ator$1.flatMap(FlinkStreamingTransformTranslators.java:570)* > > at > > > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > tors$ParDoBoundMultiStreamingTranslator$1.flatMap( > FlinkStreamingTransformTranslators.java:566) > > at > > > > org.apache.flink.streaming.api.operators.StreamFlatMap. > processElement(StreamFlatMap.java:48) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.collect(OperatorChain.java:330) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.collect(OperatorChain.java:315) > > * at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)* > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:346) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:329) > > at > > > > org.apache.beam.runners.flink.translation.wrappers. > streaming.DoFnOperator$MultiOutputOutputManagerFactor > y$1.output(DoFnOperator.java:459) > > at > > > > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext. > outputWindowedValue(SimpleOldDoFnRunner.java:270) > > at > > > > org.apache.beam.runners.core.SimpleOldDoFnRunner$ > DoFnProcessContext.output(SimpleOldDoFnRunner.java:412) > > at > > > > org.apache.beam.sdk.transforms.DoFnAdapters$ > ProcessContextAdapter.output(DoFnAdapters.java:406) > > > > Most interesting parts it's CopyingBroadcastingOutputCollector and > > 1.flatMap(FlinkStreamingTransformTranslators.java:570) > > > > For flat map: > > > > @SuppressWarnings("unchecked") > > DataStream filtered = > > unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, > > Object>() { > > @Override > > public void flatMap(RawUnionValue value, Collector<Object> > > out) throws Exception { > > if (value.getUnionTag() == outputTag) { > > out.collect(value.getValue()); > > } > > } > > }).returns(outputTypeInfo); > > > > as result we always serialize-deserialize messages even if we have only 1 > > collector for each type of tags > > > > CopyingBroadcastingOutputCollector also have path when we have only one > > collector on output > > > > public void collect(StreamRecord<T> record) { > > for (int i = 0; i < outputs.length - 1; i++) { > > Output<StreamRecord<T>> output = > > outputs[i]; > > StreamRecord<T> shallowCopy = > > record.copy(record.getValue()); > > output.collect(shallowCopy); > > } > > > > // don't copy for the last output > > outputs[outputs.length - 1].collect(record); > > } > > > > Maybe more efficiently do tag filtering before > > CopyingBroadcastingOutputCollector and avoid unnecessary work with to > copy > > data ? > > > > > > Thanks > > Alexey Diomin > > >