Hi
If we try use sideOutput with TupleTag and flink config enableObjectReuse
then we have stacktrace
at
org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234)
at
org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118)
at
org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104)
at
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
*org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
* at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459)
at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
at
org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406)
Most interesting parts it's CopyingBroadcastingOutputCollector and
1.flatMap(FlinkStreamingTransformTranslators.java:570)
For flat map:
@SuppressWarnings("unchecked")
DataStream filtered =
unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
Object>() {
@Override
public void flatMap(RawUnionValue value, Collector<Object>
out) throws Exception {
if (value.getUnionTag() == outputTag) {
out.collect(value.getValue());
}
}
}).returns(outputTypeInfo);
as result we always serialize-deserialize messages even if we have only 1
collector for each type of tags
CopyingBroadcastingOutputCollector also have path when we have only one
collector on output
public void collect(StreamRecord<T> record) {
for (int i = 0; i < outputs.length - 1; i++) {
Output<StreamRecord<T>> output = outputs[i];
StreamRecord<T> shallowCopy =
record.copy(record.getValue());
output.collect(shallowCopy);
}
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
Maybe more efficiently do tag filtering before
CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy
data ?
Thanks
Alexey Diomin