Hi

If we try use sideOutput with TupleTag and flink config enableObjectReuse
then we have stacktrace

        at
org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104)
        at
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at
*org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
        at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566)
        at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
*        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
        at
org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406)

Most interesting parts it's CopyingBroadcastingOutputCollector and
1.flatMap(FlinkStreamingTransformTranslators.java:570)

For flat map:

        @SuppressWarnings("unchecked")
        DataStream filtered =
            unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
Object>() {
              @Override
              public void flatMap(RawUnionValue value, Collector<Object>
out) throws Exception {
                if (value.getUnionTag() == outputTag) {
                  out.collect(value.getValue());
                }
              }
            }).returns(outputTypeInfo);

as result we always serialize-deserialize messages even if we have only 1
collector for each type of tags

CopyingBroadcastingOutputCollector also have path when we have only one
collector on output

             public void collect(StreamRecord<T> record) {
                        for (int i = 0; i < outputs.length - 1; i++) {
                                Output<StreamRecord<T>> output = outputs[i];
                                StreamRecord<T> shallowCopy =
record.copy(record.getValue());
                                output.collect(shallowCopy);
                        }

                        // don't copy for the last output
                        outputs[outputs.length - 1].collect(record);
                }

Maybe more efficiently do tag filtering before
CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy
data ?


Thanks
Alexey Diomin

Reply via email to