Hello again Nirmalaya,

in this case you are keying by timestamp; think of keying as grouping: this
means that windows are brought together according to their timestamp. I
misread your original post but now that I see the code I understand your
problem.

I've put some code here:
https://github.com/stefanobaghino/flink-stream-avg-example

It should more or less be the same as your program. What I did was removing
the keying. By not keying the stream, window operations (like
countWindowAll instead of countWindow) act on the whole stream, thus
forcing a parallelism of 1. I've put a couple of prints so that you can
visualize what is the parallelism and what goes in each window. To get a
better understanding of what was going on, you can put the same prints in
your code as well (printing the key for each keyed window as well); you'll
see that in your case windows are "grouped" by timestamp.

Hope I've been a little bit more helpful than last time. :)

On Sun, Feb 14, 2016 at 6:50 PM, Nirmalya Sengupta <
sengupta.nirma...@gmail.com> wrote:

> Hello Stefano <stefano.bagh...@radicalbit.io>
>
> I have tried to implement what I understood from your mail earlier in the
> day, but it doesn't seem to produce the result I expect. Here's the code
> snippet:
>
> -------------------------------------------------------------------------
>     val env = StreamExecutionEnvironment.createLocalEnvironment(4)
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>
>     val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>         .keyBy(_.readingTimeStamp)
>         .countWindow(4,2)
>
>     val avgReadings =
>         readings
>         .apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out:
> Collector[Float]) => {
>
>           val readings : Iterable[Float] = v.map(_.ambientTemperature)
>           val avg = readings.sum / readings.size
>
>           out.collect(avg)
>
>     }).setParallelism(1)
>
>     avgReadings.print()
>
> -------------------------------------------------------------------------
>
> And, here's the output:
>
> -------------------------------------------------------------------------
> 1> 23.67
> 1> 21.0025
> 1> 23.79
> 2> 25.02
> 2> 23.3425
> 2> 25.02
> 3> 26.55
> 4> 19.970001
> 3> 18.93375
> 4> 25.727499
> 3> 18.93375
> 4> 25.7075
> --------------------------------------------------------------------------
>
> My understanding is that because I have associated a parallelism(1) to the
> avgReadings transformation, it should aggregate the streams from all the 4
> earlier windows, and then compute the single average value. It is quite
> apparent that there is a gap in my understanding. Could you please point
> out the mistake that I am making?
>
> Many thanks in advance.
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to