You can not compare doubles in Java (and other languages). Reason is that 
double has
a limited precision and is rounded. See here for some examples and discussion:

https://howtodoinjava.com/java/basics/correctly-compare-float-double/

Am 03.10.2019 um 08:26 schrieb Komal Mariam <komal.mar...@gmail.com>:
> 
> 
> Hello all,
> I'm trying to do a fairly simple task that is to find the maximum value 
> (Double) received so far in a stream. This is what I implemented:
> 
> POJO class:
> public class Fish{
>     public Integer fish_id;
>     public Point coordinate;   //position
>   
> public Fish() {};
> 
> public Fish(fish_id,double x, double y) {
>    //assign to fish object
> }
> 
> Java_main.java
>  DataStream text  = env
>                 .addSource(new FlinkKafkaConsumer<>("test", new 
> JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());
> 
>       DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, Fish>() {
>             @Override
>             public Fish map(ObjectNode json) throws Exception {
>                Fish fishes = new Fish( 
> json.get("value").get("id").asInteger() 
> ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
>                 return fishes;
>             }
>         });
> 
> 
>  I can't seem to apply aggregations on the Point class without extracting the 
> x coordinates in a separate stream so here are the 2 methods I have applied:
> 
> Method 1: Simple reduce
> 
> KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
>         keyed.reduce(new ReduceFunction<Fish>() {
>             @Override
>             public Fish reduce(Fish t, Fish t1) throws Exception {
>                 if (t.X > t1.X) {
>                     return t;
>                 } else
>                     return t1;
>             }
>         }).map(new MapFunction<Fish, Double>() {
>                     @Override
>                     public Double map(Fish t) throws Exception {
>                         return t.point.getX();
>                     }
>                 }).print();
> 
> 
> Result: 1> -73.8517632
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.851446       //smaller than previous value!
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.8517632
> 1> -73.851446
> 3> -73.85012          
> 3> -73.850212
> 3> -73.851979     //smaller than previous value
> 3> -73.850212
> 3> -73.8512969
> 3> -73.8512969  
> 
> 1) I'm trying to compute the max so why do I see smaller values after the 
> latest maximum value. I think this is because order of the outputs is not 
> preserved as same as inputs?
> If this is so how do I ensure that the order is preserved and I only see the 
> latest maximum value?
> 
> 2) Another thing I have noticed is that if instance 1 produces the max say 
> -73.8505642 but then instance 2 would produce  -73.9064 which is again 
> smaller than the value produced by instance 1. I'm assuming its because there 
> is no communication between parallel instances hence they produce two value. 
> If this is so how do I get ONE maximum value from all the parallel instances 
> combined? 
> 
> Method 2: Using States
> 
> public class GetMaximum extends RichMapFunction<Fish, Fish> {
> 
>     private transient ValueState<Fish> max;
> 
>     @Override
>     public Fish map(Fish input) throws Exception {
> 
>         // access the state value
>        Fish currentMaximum = max.value();
> 
>         if (input.point.getX() > currentMaximum.point.getX()) {
>             currentMaximum.objid = input.objid;
>             currentMaximum.point = (org.locationtech.jts.geom.Point) 
> input.point.clone();
>             max.update(currentMaximum);
>         }
>         return currentMaximum;
>     }
> 
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Fish> descriptor =
>                 new ValueStateDescriptor<>(
>                         "average", // the state name
>                         TypeInformation.of(new TypeHint<Fish>() {}), // type 
> information
>                         new Fish(-100,0)); // default value of the state, if 
> nothing was set
>         max = getRuntimeContext().getState(descriptor);
>     }
> }
> 
> 3) I'm getting the same results as method 1. isn't ValueState shared between 
> all instances of the same operator? 
> 
> 4) Out of the two methodologies which is a better choice?
> 
> Would really appreciate your help!
> 
> Best Regards,
> Komal
> 
> 
> 
> 

Reply via email to