I converted to this SingleOutputStreamOperator<Tuple2<Integer, XMPP>> tuple2Stream = sourceStream.map(new RichMapFunction<XMPP, Tuple2<Integer, XMPP>>() { @Override public Tuple2<Integer, XMPP> map(XMPP value) throws Exception { return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), value); } }); DataStreamUtils.reinterpretAsKeyedStream(tuple2Stream, (t) -> t.f0)
an uggly hack but works. On Thu, 10 Jan 2019 at 10:54, CPC <acha...@gmail.com> wrote: > Hi Ken, > > I am doing a global distinct. What i want to achive is someting like > below. With windowAll it sends all data to single operator which means > shuffle all data and calculate with par 1. I dont want to shuffle data > since i just want to feed it to hll instance and shuffle just hll instances > at the end of the window and merge them. This is exactly the same scenario > with global count. Suppose you want to count events for each 1 minutes > window. In current case we should send all data to single operator and > count there. Instead of this we can calculate sub totals and then send > those subtotals to single operator and merge there. > > > [image: image.png] > > On Thu, 10 Jan 2019 at 02:26, Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> >> On Jan 9, 2019, at 3:10 PM, CPC <acha...@gmail.com> wrote: >> >> Hi Ken, >> >> From regular time-based windows do you mean keyed windows? >> >> >> Correct. Without doing a keyBy() you would have a parallelism of 1. >> >> I think you want to key on whatever you’re counting for unique values, so >> that each window operator gets a slice of the unique values. >> >> — Ken >> >> On Wed, Jan 9, 2019, 10:22 PM Ken Krugler <kkrugler_li...@transpac.com >> wrote: >> >>> Hi there, >>> >>> You should be able to use a regular time-based window(), and emit the >>> HyperLogLog binary data as your result, which then would get merged in your >>> custom function (which you set a parallelism of 1 on). >>> >>> Note that if you are generating unique counts per non-overlapping time >>> window, you’ll need to keep N HLL structures in each operator. >>> >>> — Ken >>> >>> >>> On Jan 9, 2019, at 10:26 AM, CPC <acha...@gmail.com> wrote: >>> >>> Hi Stefan, >>> >>> Could i use "Reinterpreting a pre-partitioned data stream as keyed >>> stream" feature for this? >>> >>> On Wed, 9 Jan 2019 at 17:50, Stefan Richter <s.rich...@da-platform.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I think your expectation about windowAll is wrong, from the method >>>> documentation: “Note: This operation is inherently non-parallel since all >>>> elements have to pass through the same operator instance” and I also cannot >>>> think of a way in which the windowing API would support your use case >>>> without a shuffle. You could probably build the functionality by hand >>>> through, but I guess this is not quite what you want. >>>> >>>> Best, >>>> Stefan >>>> >>>> > On 9. Jan 2019, at 13:43, CPC <acha...@gmail.com> wrote: >>>> > >>>> > Hi all, >>>> > >>>> > In our implementation,we are consuming from kafka and calculating >>>> distinct with hyperloglog. We are using windowAll function with a custom >>>> AggregateFunction but flink runtime shows a little bit unexpected behavior >>>> at runtime. Our sources running with parallelism 4 and i expect add >>>> function to run after source calculate partial results and at the end of >>>> the window i expect it to send 4 hll object to single operator to merge >>>> there(merge function). Instead, it sends all data to single instance and >>>> call add function there. >>>> > >>>> > Is here any way to make flink behave like this? I mean calculate >>>> partial results after consuming from kafka with paralelism of sources >>>> without shuffling(so some part of the calculation can be calculated in >>>> parallel) and merge those partial results with a merge function? >>>> > >>>> > Thank you in advance... >>>> >>>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >>