Hi Jörn!

Thanks for your suggestions.

Btw just a correction in the Fish class it's "public Point point" not  "public
Point coordinate".

For double type comparison, I implemented what you suggested and used
BigDecimal for their comparison. I'm still getting the same results where I
see smaller values after the latest maximum value.

I would prefer using the max method but I need to do max on the x-value
inside the Point object contained inside the Fish object. (I retrieve it
using fish.point.getX();)

The max method on keyed stream works by providing the field name in the
form of a string  keyedStream.max("point")  so is there a way I can
reference the x coordinate inside the point object to max on?
Is there a way I can implement  keyedStream.max(fish.point.getX())?

About your reduce function:
> You execute it by fish_id if I see it correctly. This will create one
> result by fish_id . I propose to map first all fish coordinates under a
> single key and then reduce by this single key.


^Will try this next.

Regards,
Komal





On Thu, 3 Oct 2019 at 16:42, Jörn Franke <jornfra...@gmail.com> wrote:

> Btw. Why don’t you use the max method?
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String-
>
> See here about the state solution:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html
>
> About your reduce function:
> You execute it by fish_id if I see it correctly. This will create one
> result by fish_id . I propose to map first all fish coordinates under a
> single key and then reduce by this single key.
>
> Am 03.10.2019 um 08:26 schrieb Komal Mariam <komal.mar...@gmail.com>:
>
> 
> Hello all,
> I'm trying to do a fairly simple task that is to find the maximum value
> (Double) received so far in a stream. This is what I implemented:
>
> POJO class:
> public class Fish{
>     public Integer fish_id;
>     public Point coordinate;   //position
>
> public Fish() {};
>
> public Fish(fish_id,double x, double y) {
>    //assign to fish object
> }
>
> Java_main.java
>  DataStream text  = env
>                 .addSource(new FlinkKafkaConsumer<>("test", new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());
>
>       DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode,
> Fish>() {
>             @Override
>             public Fish map(ObjectNode json) throws Exception {
>                Fish fishes = new Fish(
> json.get("value").get("id").asInteger()
> ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
>                 return fishes;
>             }
>         });
>
>
>  I can't seem to apply aggregations on the Point class without extracting
> the x coordinates in a separate stream so here are the 2 methods I have
> applied:
>
> Method 1: Simple reduce
>
>
> KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
>         keyed.reduce(new ReduceFunction<Fish>() {
>             @Override
>             public Fish reduce(Fish t, Fish t1) throws Exception {
>                 if (t.X > t1.X) {
>                     return t;
>                 } else
>                     return t1;
>             }
>         }).map(new MapFunction<Fish, Double>() {
>                     @Override
>                     public Double map(Fish t) throws Exception {
>                         return t.point.getX();
>                     }
>                 }).print();
>
>
>
> Result: 1> -73.8517632
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.851446       //smaller than previous value!
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.8517632
> 1> -73.851446
> 3> -73.85012
> 3> -73.850212
> 3> -73.851979     //smaller than previous value
> 3> -73.850212
> 3> -73.8512969
> 3> -73.8512969
>
> *1)* I'm trying to compute the max so why do I see smaller values after
> the latest maximum value. I think this is because order of the outputs is
> not preserved as same as inputs?
> If this is so how do I ensure that the order is preserved and I only see
> the latest maximum value?
>
> *2)* Another thing I have noticed is that if instance 1 produces the max
> say -73.8505642 but then instance 2 would produce  -73.9064 which is again
> smaller than the value produced by instance 1. I'm assuming its because
> there is no communication between parallel instances hence they produce two
> value. If this is so how do I get ONE maximum value from all the parallel
> instances combined?
>
> Method 2: Using States
>
> public class GetMaximum extends RichMapFunction<Fish, Fish> {
>
>     private transient ValueState<Fish> max;
>
>     @Override
>     public Fish map(Fish input) throws Exception {
>
>         // access the state value
>        Fish currentMaximum = max.value();
>
>         if (input.point.getX() > currentMaximum.point.getX()) {
>             currentMaximum.objid = input.objid;
>             currentMaximum.point = (org.locationtech.jts.geom.Point)
> input.point.clone();
>             max.update(currentMaximum);
>         }
>         return currentMaximum;
>     }
>
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Fish> descriptor =
>                 new ValueStateDescriptor<>(
>                         "average", // the state name
>                         TypeInformation.of(new TypeHint<Fish>() {}), //
> type information
>                         new Fish(-100,0)); // default value of the state,
> if nothing was set
>         max = getRuntimeContext().getState(descriptor);
>     }
> }
>
> *3) *I'm getting the same results as method 1. isn't ValueState shared
> between all instances of the same operator?
>
> *4) *Out of the two methodologies which is a better choice?
>
> Would really appreciate your help!
>
> Best Regards,
> Komal
>
>
>
>
>

Reply via email to