Hi If we try use sideOutput with TupleTag and flink config enableObjectReuse then we have stacktrace
at org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234) at org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118) at org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104) at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at *org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)* at org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315) * at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)* at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459) at org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270) at org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412) at org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406) Most interesting parts it's CopyingBroadcastingOutputCollector and 1.flatMap(FlinkStreamingTransformTranslators.java:570) For flat map: @SuppressWarnings("unchecked") DataStream filtered = unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() { @Override public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { if (value.getUnionTag() == outputTag) { out.collect(value.getValue()); } } }).returns(outputTypeInfo); as result we always serialize-deserialize messages even if we have only 1 collector for each type of tags CopyingBroadcastingOutputCollector also have path when we have only one collector on output public void collect(StreamRecord<T> record) { for (int i = 0; i < outputs.length - 1; i++) { Output<StreamRecord<T>> output = outputs[i]; StreamRecord<T> shallowCopy = record.copy(record.getValue()); output.collect(shallowCopy); } // don't copy for the last output outputs[outputs.length - 1].collect(record); } Maybe more efficiently do tag filtering before CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy data ? Thanks Alexey Diomin