Hello again Nirmalaya, in this case you are keying by timestamp; think of keying as grouping: this means that windows are brought together according to their timestamp. I misread your original post but now that I see the code I understand your problem.
I've put some code here: https://github.com/stefanobaghino/flink-stream-avg-example It should more or less be the same as your program. What I did was removing the keying. By not keying the stream, window operations (like countWindowAll instead of countWindow) act on the whole stream, thus forcing a parallelism of 1. I've put a couple of prints so that you can visualize what is the parallelism and what goes in each window. To get a better understanding of what was going on, you can put the same prints in your code as well (printing the key for each keyed window as well); you'll see that in your case windows are "grouped" by timestamp. Hope I've been a little bit more helpful than last time. :) On Sun, Feb 14, 2016 at 6:50 PM, Nirmalya Sengupta < sengupta.nirma...@gmail.com> wrote: > Hello Stefano <stefano.bagh...@radicalbit.io> > > I have tried to implement what I understood from your mail earlier in the > day, but it doesn't seem to produce the result I expect. Here's the code > snippet: > > ------------------------------------------------------------------------- > val env = StreamExecutionEnvironment.createLocalEnvironment(4) > > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > > val readings = > readIncomingReadings(env,"./sampleIOTTiny.csv") > .keyBy(_.readingTimeStamp) > .countWindow(4,2) > > val avgReadings = > readings > .apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out: > Collector[Float]) => { > > val readings : Iterable[Float] = v.map(_.ambientTemperature) > val avg = readings.sum / readings.size > > out.collect(avg) > > }).setParallelism(1) > > avgReadings.print() > > ------------------------------------------------------------------------- > > And, here's the output: > > ------------------------------------------------------------------------- > 1> 23.67 > 1> 21.0025 > 1> 23.79 > 2> 25.02 > 2> 23.3425 > 2> 25.02 > 3> 26.55 > 4> 19.970001 > 3> 18.93375 > 4> 25.727499 > 3> 18.93375 > 4> 25.7075 > -------------------------------------------------------------------------- > > My understanding is that because I have associated a parallelism(1) to the > avgReadings transformation, it should aggregate the streams from all the 4 > earlier windows, and then compute the single average value. It is quite > apparent that there is a gap in my understanding. Could you please point > out the mistake that I am making? > > Many thanks in advance. > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them." > -- BR, Stefano Baghino Software Engineer @ Radicalbit