Hi

very simple example
https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931

Sometime we need meta-information about processing element

If i correctly understood code in FlinkStreamingTransformTranslators line
557:
main problem not in translators, but in flink runtime, which don't know
about tags and simple does broadcast when have 2 output from one
transformation

Correct me if I mistaken


>> this is a bit of a dangerous setting

I know about dangerous with object-reuse, but we never use object after
collect.
In some cases we need more performance and serialization on every
transformation very expensive,
but try merge all business logic in one DoFn it to make processing
unsupportable.

>> I think your stack trace is not complete, at least I can't seem to see
the root exception.

We made this stacktrace on live system with jstack. It's not exception.

Thanks,
Alexey Diomin


2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi Alexey,
> I think it should be possible to optimise this particular transformation by
> using a split/select pattern in Flink. (See split and select here:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/datastream_api.html#datastream-transformations).
> The current implementation is not very optimised, my main goal was to make
> all features of Beam work before going into individual optimisations.
>
> About object-reuse in Flink Streaming: this is a bit of a dangerous setting
> and can lead to unexpected results with certain patterns. I think your
> stack trace is not complete, at least I can't seem to see the root
> exception.
>
> Cheers,
> Aljoscha
>
> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <diomi...@gmail.com> wrote:
>
> > Hi
> >
> > If we try use sideOutput with TupleTag and flink config enableObjectReuse
> > then we have stacktrace
> >
> >         at
> >
> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.
> processElement(DoFnAdapters.java:234)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(
> SimpleOldDoFnRunner.java:118)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(
> SimpleOldDoFnRunner.java:104)
> >         at
> >
> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(
> PushbackSideInputDoFnRunner.java:106)
> >         at
> >
> > org.apache.beam.runners.flink.translation.wrappers.
> streaming.DoFnOperator.processElement(DoFnOperator.java:265)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:330)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:315)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:346)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:329)
> >         at
> >
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> >         at
> >
> > *org.apache.beam.runners.flink.translation.
> FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTransl
> ator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
> >         at
> >
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$ParDoBoundMultiStreamingTranslator$1.flatMap(
> FlinkStreamingTransformTranslators.java:566)
> >         at
> >
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:48)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:330)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:315)
> > *        at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:346)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:329)
> >         at
> >
> > org.apache.beam.runners.flink.translation.wrappers.
> streaming.DoFnOperator$MultiOutputOutputManagerFactor
> y$1.output(DoFnOperator.java:459)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.
> outputWindowedValue(SimpleOldDoFnRunner.java:270)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner$
> DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
> >         at
> >
> > org.apache.beam.sdk.transforms.DoFnAdapters$
> ProcessContextAdapter.output(DoFnAdapters.java:406)
> >
> > Most interesting parts it's CopyingBroadcastingOutputCollector and
> > 1.flatMap(FlinkStreamingTransformTranslators.java:570)
> >
> > For flat map:
> >
> >         @SuppressWarnings("unchecked")
> >         DataStream filtered =
> >             unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
> > Object>() {
> >               @Override
> >               public void flatMap(RawUnionValue value, Collector<Object>
> > out) throws Exception {
> >                 if (value.getUnionTag() == outputTag) {
> >                   out.collect(value.getValue());
> >                 }
> >               }
> >             }).returns(outputTypeInfo);
> >
> > as result we always serialize-deserialize messages even if we have only 1
> > collector for each type of tags
> >
> > CopyingBroadcastingOutputCollector also have path when we have only one
> > collector on output
> >
> >              public void collect(StreamRecord<T> record) {
> >                         for (int i = 0; i < outputs.length - 1; i++) {
> >                                 Output<StreamRecord<T>> output =
> > outputs[i];
> >                                 StreamRecord<T> shallowCopy =
> > record.copy(record.getValue());
> >                                 output.collect(shallowCopy);
> >                         }
> >
> >                         // don't copy for the last output
> >                         outputs[outputs.length - 1].collect(record);
> >                 }
> >
> > Maybe more efficiently do tag filtering before
> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to
> copy
> > data ?
> >
> >
> > Thanks
> > Alexey Diomin
> >
>

Reply via email to