I'm having a look at your PRs now. I think the change is good, and it's actually quite simple too.
Thanks for looking into this! On Mon, 5 Dec 2016 at 05:48 Alexey Demin <diomi...@gmail.com> wrote: > Aljoscha > > I mistaken with flink runtime =) > > What do you think about some modification FlinkStreamingTransformTransla > tors: > > move split out of for-loop: > > SplitStream<RawUnionValue> splitStream = unionOutputStream.split(new > OutputSelector<RawUnionValue>() { > @Override > public Iterable<String> select(RawUnionValue value) { > return Lists.newArrayList(String.valueOf(value.getUnionTag())); > } > }); > > and change filtered to > > DataStream filtered = splitStream.select(String.valueOf(outputTag)) > .flatMap(new FlatMapFunction<RawUnionValue, > Object>() { > @Override > public void flatMap(RawUnionValue value, > Collector<Object> out) throws Exception { > out.collect(value.getValue()); > } > }).returns(outputTypeInfo); > > In this implementations we always transfer data only for necessary output > without broadcast every type by all output. > > p.s. I know this code not production ready, only idea for discuss. > but for people who use side output only for alerting it's can reduce cpu > usage (serialization will apply only on targeted value, not for all > elements for every outputs) > > Thanks, > Alexey Diomin > > > 2016-12-04 23:57 GMT+04:00 Alexey Demin <diomi...@gmail.com>: > > > Hi > > > > very simple example > > https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931 > > > > Sometime we need meta-information about processing element > > > > If i correctly understood code in FlinkStreamingTransformTranslators line > > 557: > > main problem not in translators, but in flink runtime, which don't know > > about tags and simple does broadcast when have 2 output from one > > transformation > > > > Correct me if I mistaken > > > > > > >> this is a bit of a dangerous setting > > > > I know about dangerous with object-reuse, but we never use object after > > collect. > > In some cases we need more performance and serialization on every > > transformation very expensive, > > but try merge all business logic in one DoFn it to make processing > > unsupportable. > > > > >> I think your stack trace is not complete, at least I can't seem to see > > the root exception. > > > > We made this stacktrace on live system with jstack. It's not exception. > > > > Thanks, > > Alexey Diomin > > > > > > 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > > > >> Hi Alexey, > >> I think it should be possible to optimise this particular transformation > >> by > >> using a split/select pattern in Flink. (See split and select here: > >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ > >> dev/datastream_api.html#datastream-transformations). > >> The current implementation is not very optimised, my main goal was to > make > >> all features of Beam work before going into individual optimisations. > >> > >> About object-reuse in Flink Streaming: this is a bit of a dangerous > >> setting > >> and can lead to unexpected results with certain patterns. I think your > >> stack trace is not complete, at least I can't seem to see the root > >> exception. > >> > >> Cheers, > >> Aljoscha > >> > >> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <diomi...@gmail.com> wrote: > >> > >> > Hi > >> > > >> > If we try use sideOutput with TupleTag and flink config > >> enableObjectReuse > >> > then we have stacktrace > >> > > >> > at > >> > > >> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapte > >> r.processElement(DoFnAdapters.java:234) > >> > at > >> > > >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProce > >> ssElement(SimpleOldDoFnRunner.java:118) > >> > at > >> > > >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElem > >> ent(SimpleOldDoFnRunner.java:104) > >> > at > >> > > >> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro > >> cessElement(PushbackSideInputDoFnRunner.java:106) > >> > at > >> > > >> > org.apache.beam.runners.flink.translation.wrappers.streaming > >> .DoFnOperator.processElement(DoFnOperator.java:265) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain > >> ingOutput.collect(OperatorChain.java:330) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain > >> ingOutput.collect(OperatorChain.java:315) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera > >> tor$CountingOutput.collect(AbstractStreamOperator.java:346) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera > >> tor$CountingOutput.collect(AbstractStreamOperator.java:329) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.TimestampedCollecto > >> r.collect(TimestampedCollector.java:51) > >> > at > >> > > >> > *org.apache.beam.runners.flink.translation.FlinkStreamingTra > >> nsformTranslators$ParDoBoundMultiStreamingTranslator$1. > >> flatMap(FlinkStreamingTransformTranslators.java:570)* > >> > at > >> > > >> > org.apache.beam.runners.flink.translation.FlinkStreamingTran > >> sformTranslators$ParDoBoundMultiStreamingTranslator$1. > >> flatMap(FlinkStreamingTransformTranslators.java:566) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.StreamFlatMap.proce > >> ssElement(StreamFlatMap.java:48) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain > >> ingOutput.collect(OperatorChain.java:330) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain > >> ingOutput.collect(OperatorChain.java:315) > >> > * at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi > >> ngBroadcastingOutputCollector.collect(OperatorChain.java:427)* > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi > >> ngBroadcastingOutputCollector.collect(OperatorChain.java:415) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera > >> tor$CountingOutput.collect(AbstractStreamOperator.java:346) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera > >> tor$CountingOutput.collect(AbstractStreamOperator.java:329) > >> > at > >> > > >> > org.apache.beam.runners.flink.translation.wrappers.streaming > >> .DoFnOperator$MultiOutputOutputManagerFactory$1.output( > >> DoFnOperator.java:459) > >> > at > >> > > >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext > >> .outputWindowedValue(SimpleOldDoFnRunner.java:270) > >> > at > >> > > >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcess > >> Context.output(SimpleOldDoFnRunner.java:412) > >> > at > >> > > >> > org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAd > >> apter.output(DoFnAdapters.java:406) > >> > > >> > Most interesting parts it's CopyingBroadcastingOutputCollector and > >> > 1.flatMap(FlinkStreamingTransformTranslators.java:570) > >> > > >> > For flat map: > >> > > >> > @SuppressWarnings("unchecked") > >> > DataStream filtered = > >> > unionOutputStream.flatMap(new > FlatMapFunction<RawUnionValue, > >> > Object>() { > >> > @Override > >> > public void flatMap(RawUnionValue value, > Collector<Object> > >> > out) throws Exception { > >> > if (value.getUnionTag() == outputTag) { > >> > out.collect(value.getValue()); > >> > } > >> > } > >> > }).returns(outputTypeInfo); > >> > > >> > as result we always serialize-deserialize messages even if we have > only > >> 1 > >> > collector for each type of tags > >> > > >> > CopyingBroadcastingOutputCollector also have path when we have only > one > >> > collector on output > >> > > >> > public void collect(StreamRecord<T> record) { > >> > for (int i = 0; i < outputs.length - 1; i++) { > >> > Output<StreamRecord<T>> output = > >> > outputs[i]; > >> > StreamRecord<T> shallowCopy = > >> > record.copy(record.getValue()); > >> > output.collect(shallowCopy); > >> > } > >> > > >> > // don't copy for the last output > >> > outputs[outputs.length - 1].collect(record); > >> > } > >> > > >> > Maybe more efficiently do tag filtering before > >> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to > >> copy > >> > data ? > >> > > >> > > >> > Thanks > >> > Alexey Diomin > >> > > >> > > > > >