Ok thanks for the suggestion but I think I'll wait for another Flink
version before migrating Datasets to Datastream I think...
In my experience it is  very helpful to have open/close on all operators.

Best,
Flavio

On Tue, Jul 28, 2020 at 8:51 AM Aljoscha Krettek <[email protected]>
wrote:

> I think that should work with an aggregate() instead of reduce().
>
> Best,
> Aljoscha
>
> On 24.07.20 17:02, Flavio Pompermaier wrote:
> > In my reduce function I want to compute some aggregation on the
> sub-results
> > of a map-partition (that I tried to migrate from DataSet to DataStream
> > without success).
> > The original code was something like:
> >
> >   input.mapPartition(new RowToStringSketches(sketchMapSize)) //
> >          .reduce(new SketchesStringReducer()) //
> >          .map(new SketchesStringToStatsPojo(colIndex, topK));
> >
> > I asked about the simulation of the mapPartition function in the
> streaming
> > env in another thread in the mailing list [1] because I was not able to
> > test it..it seems that the program was exiting before be able to process
> > anything..
> > So I gave up on replacing DataSet with DataStream API for the moment..it
> > seems that there are too many things still to migrate.
> > Btw, this is the reduce function:
> >
> > public class SketchesStringReducer extends
> > RichReduceFunction<Tuple2<byte[], byte[]>> {
> >    private static final long serialVersionUID = 1L;
> >
> >    private transient ArrayOfItemsSerDe<String> serDe;
> >
> >    @Override
> >    public void open(Configuration parameters) throws Exception {
> >      this.serDe = new ArrayOfStringsSerDe();
> >    }
> >
> >    @Override
> >    public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1,
> > Tuple2<byte[], byte[]> t2)
> >        throws Exception {
> >      // merge HLL
> >      final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
> >      final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
> >      final Union union = new Union(hll1.getLgConfigK());
> >      union.update(hll1);
> >      union.update(hll2);
> >      final byte[] hllSketchBytes =
> union.getResult().toCompactByteArray();
> >
> >      // merge Item
> >      final ItemsSketch<String> s1 =
> > ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
> >      final ItemsSketch<String> s2 =
> > ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
> >      final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
> >      return new Tuple2<>(hllSketchBytes, itemSketchBytes);
> >    }
> > }
> >
> > [1]
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767
> >
> > On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[email protected]>
> > wrote:
> >
> >> What are you trying to do in the ReduceFunction? Without knowing the
> >> code, maybe an aggregate(AggregateFunction) is the solution.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 20.07.20 18:03, Flavio Pompermaier wrote:
> >>> Thanks Aljosha for the reply. So what can I do in my reduce function
> that
> >>> contains transient variables (i.e. not serializable)?
> >>>
> >>> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[email protected]>
> >>> wrote:
> >>>
> >>>> Hi Flavio,
> >>>>
> >>>> the reason is that under the covers the ReduceFunction will be used as
> >>>> the ReduceFunction of a ReducingState. And those cannot be rich
> >>>> functions because we cannot provide all the required context "inside"
> >>>> the state backend.
> >>>>
> >>>> You can see how the ReduceFunction is used to create a
> >>>> ReducingStateDescriptor here:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> On 16.07.20 16:28, Flavio Pompermaier wrote:
> >>>>> Hi to all,
> >>>>> I'm trying to apply a rich reduce function after a countWindowAll but
> >>>> Flink
> >>>>> says
> >>>>> "ReduceFunction of reduce can not be a RichFunction. Please use
> >>>>> reduce(ReduceFunction, WindowFunction) instead."
> >>>>>
> >>>>> Is there any good reason for this? Or am I doing something wrong?
> >>>>>
> >>>>> Best,
> >>>>> Flavio
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to