I'm having a look at your PRs now. I think the change is good, and it's
actually quite simple too.

Thanks for looking into this!

On Mon, 5 Dec 2016 at 05:48 Alexey Demin <diomi...@gmail.com> wrote:

> Aljoscha
>
> I mistaken with flink runtime =)
>
> What do you think about some modification FlinkStreamingTransformTransla
> tors:
>
> move split out of for-loop:
>
> SplitStream<RawUnionValue> splitStream = unionOutputStream.split(new
> OutputSelector<RawUnionValue>() {
>         @Override
>         public Iterable<String> select(RawUnionValue value) {
>           return Lists.newArrayList(String.valueOf(value.getUnionTag()));
>         }
>       });
>
> and change filtered to
>
> DataStream filtered = splitStream.select(String.valueOf(outputTag))
>                         .flatMap(new FlatMapFunction<RawUnionValue,
> Object>() {
>                             @Override
>                             public void flatMap(RawUnionValue value,
> Collector<Object> out) throws Exception {
>                                 out.collect(value.getValue());
>                             }
>                           }).returns(outputTypeInfo);
>
> In this implementations we always transfer data only for necessary output
> without broadcast every type by all output.
>
> p.s. I know this code not production ready, only idea for discuss.
> but for people who use side output only for alerting it's can reduce cpu
> usage (serialization will apply only on targeted value, not for all
> elements for every outputs)
>
> Thanks,
> Alexey Diomin
>
>
> 2016-12-04 23:57 GMT+04:00 Alexey Demin <diomi...@gmail.com>:
>
> > Hi
> >
> > very simple example
> > https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931
> >
> > Sometime we need meta-information about processing element
> >
> > If i correctly understood code in FlinkStreamingTransformTranslators line
> > 557:
> > main problem not in translators, but in flink runtime, which don't know
> > about tags and simple does broadcast when have 2 output from one
> > transformation
> >
> > Correct me if I mistaken
> >
> >
> > >> this is a bit of a dangerous setting
> >
> > I know about dangerous with object-reuse, but we never use object after
> > collect.
> > In some cases we need more performance and serialization on every
> > transformation very expensive,
> > but try merge all business logic in one DoFn it to make processing
> > unsupportable.
> >
> > >> I think your stack trace is not complete, at least I can't seem to see
> > the root exception.
> >
> > We made this stacktrace on live system with jstack. It's not exception.
> >
> > Thanks,
> > Alexey Diomin
> >
> >
> > 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>:
> >
> >> Hi Alexey,
> >> I think it should be possible to optimise this particular transformation
> >> by
> >> using a split/select pattern in Flink. (See split and select here:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> >> dev/datastream_api.html#datastream-transformations).
> >> The current implementation is not very optimised, my main goal was to
> make
> >> all features of Beam work before going into individual optimisations.
> >>
> >> About object-reuse in Flink Streaming: this is a bit of a dangerous
> >> setting
> >> and can lead to unexpected results with certain patterns. I think your
> >> stack trace is not complete, at least I can't seem to see the root
> >> exception.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <diomi...@gmail.com> wrote:
> >>
> >> > Hi
> >> >
> >> > If we try use sideOutput with TupleTag and flink config
> >> enableObjectReuse
> >> > then we have stacktrace
> >> >
> >> >         at
> >> >
> >> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapte
> >> r.processElement(DoFnAdapters.java:234)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProce
> >> ssElement(SimpleOldDoFnRunner.java:118)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElem
> >> ent(SimpleOldDoFnRunner.java:104)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
> >> cessElement(PushbackSideInputDoFnRunner.java:106)
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.wrappers.streaming
> >> .DoFnOperator.processElement(DoFnOperator.java:265)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:330)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:315)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.TimestampedCollecto
> >> r.collect(TimestampedCollector.java:51)
> >> >         at
> >> >
> >> > *org.apache.beam.runners.flink.translation.FlinkStreamingTra
> >> nsformTranslators$ParDoBoundMultiStreamingTranslator$1.
> >> flatMap(FlinkStreamingTransformTranslators.java:570)*
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.FlinkStreamingTran
> >> sformTranslators$ParDoBoundMultiStreamingTranslator$1.
> >> flatMap(FlinkStreamingTransformTranslators.java:566)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.StreamFlatMap.proce
> >> ssElement(StreamFlatMap.java:48)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:330)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:315)
> >> > *        at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> >> ngBroadcastingOutputCollector.collect(OperatorChain.java:427)*
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> >> ngBroadcastingOutputCollector.collect(OperatorChain.java:415)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.wrappers.streaming
> >> .DoFnOperator$MultiOutputOutputManagerFactory$1.output(
> >> DoFnOperator.java:459)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext
> >> .outputWindowedValue(SimpleOldDoFnRunner.java:270)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcess
> >> Context.output(SimpleOldDoFnRunner.java:412)
> >> >         at
> >> >
> >> > org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAd
> >> apter.output(DoFnAdapters.java:406)
> >> >
> >> > Most interesting parts it's CopyingBroadcastingOutputCollector and
> >> > 1.flatMap(FlinkStreamingTransformTranslators.java:570)
> >> >
> >> > For flat map:
> >> >
> >> >         @SuppressWarnings("unchecked")
> >> >         DataStream filtered =
> >> >             unionOutputStream.flatMap(new
> FlatMapFunction<RawUnionValue,
> >> > Object>() {
> >> >               @Override
> >> >               public void flatMap(RawUnionValue value,
> Collector<Object>
> >> > out) throws Exception {
> >> >                 if (value.getUnionTag() == outputTag) {
> >> >                   out.collect(value.getValue());
> >> >                 }
> >> >               }
> >> >             }).returns(outputTypeInfo);
> >> >
> >> > as result we always serialize-deserialize messages even if we have
> only
> >> 1
> >> > collector for each type of tags
> >> >
> >> > CopyingBroadcastingOutputCollector also have path when we have only
> one
> >> > collector on output
> >> >
> >> >              public void collect(StreamRecord<T> record) {
> >> >                         for (int i = 0; i < outputs.length - 1; i++) {
> >> >                                 Output<StreamRecord<T>> output =
> >> > outputs[i];
> >> >                                 StreamRecord<T> shallowCopy =
> >> > record.copy(record.getValue());
> >> >                                 output.collect(shallowCopy);
> >> >                         }
> >> >
> >> >                         // don't copy for the last output
> >> >                         outputs[outputs.length - 1].collect(record);
> >> >                 }
> >> >
> >> > Maybe more efficiently do tag filtering before
> >> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to
> >> copy
> >> > data ?
> >> >
> >> >
> >> > Thanks
> >> > Alexey Diomin
> >> >
> >>
> >
> >
>

Reply via email to