Ok thanks for the suggestion but I think I'll wait for another Flink version before migrating Datasets to Datastream I think... In my experience it is very helpful to have open/close on all operators.
Best, Flavio On Tue, Jul 28, 2020 at 8:51 AM Aljoscha Krettek <[email protected]> wrote: > I think that should work with an aggregate() instead of reduce(). > > Best, > Aljoscha > > On 24.07.20 17:02, Flavio Pompermaier wrote: > > In my reduce function I want to compute some aggregation on the > sub-results > > of a map-partition (that I tried to migrate from DataSet to DataStream > > without success). > > The original code was something like: > > > > input.mapPartition(new RowToStringSketches(sketchMapSize)) // > > .reduce(new SketchesStringReducer()) // > > .map(new SketchesStringToStatsPojo(colIndex, topK)); > > > > I asked about the simulation of the mapPartition function in the > streaming > > env in another thread in the mailing list [1] because I was not able to > > test it..it seems that the program was exiting before be able to process > > anything.. > > So I gave up on replacing DataSet with DataStream API for the moment..it > > seems that there are too many things still to migrate. > > Btw, this is the reduce function: > > > > public class SketchesStringReducer extends > > RichReduceFunction<Tuple2<byte[], byte[]>> { > > private static final long serialVersionUID = 1L; > > > > private transient ArrayOfItemsSerDe<String> serDe; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > this.serDe = new ArrayOfStringsSerDe(); > > } > > > > @Override > > public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1, > > Tuple2<byte[], byte[]> t2) > > throws Exception { > > // merge HLL > > final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0)); > > final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0)); > > final Union union = new Union(hll1.getLgConfigK()); > > union.update(hll1); > > union.update(hll2); > > final byte[] hllSketchBytes = > union.getResult().toCompactByteArray(); > > > > // merge Item > > final ItemsSketch<String> s1 = > > ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe); > > final ItemsSketch<String> s2 = > > ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe); > > final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe); > > return new Tuple2<>(hllSketchBytes, itemSketchBytes); > > } > > } > > > > [1] > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767 > > > > On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[email protected]> > > wrote: > > > >> What are you trying to do in the ReduceFunction? Without knowing the > >> code, maybe an aggregate(AggregateFunction) is the solution. > >> > >> Best, > >> Aljoscha > >> > >> On 20.07.20 18:03, Flavio Pompermaier wrote: > >>> Thanks Aljosha for the reply. So what can I do in my reduce function > that > >>> contains transient variables (i.e. not serializable)? > >>> > >>> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[email protected]> > >>> wrote: > >>> > >>>> Hi Flavio, > >>>> > >>>> the reason is that under the covers the ReduceFunction will be used as > >>>> the ReduceFunction of a ReducingState. And those cannot be rich > >>>> functions because we cannot provide all the required context "inside" > >>>> the state backend. > >>>> > >>>> You can see how the ReduceFunction is used to create a > >>>> ReducingStateDescriptor here: > >>>> > >>>> > >> > https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300 > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> On 16.07.20 16:28, Flavio Pompermaier wrote: > >>>>> Hi to all, > >>>>> I'm trying to apply a rich reduce function after a countWindowAll but > >>>> Flink > >>>>> says > >>>>> "ReduceFunction of reduce can not be a RichFunction. Please use > >>>>> reduce(ReduceFunction, WindowFunction) instead." > >>>>> > >>>>> Is there any good reason for this? Or am I doing something wrong? > >>>>> > >>>>> Best, > >>>>> Flavio > >>>>> > >>>> > >>> > >> > > > >
