Hello Fabian <[email protected]>
Merry Christmas to you and everyone else in this forum.
Another neophyte's question, patience please.
I have following code:
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3))
.maxBy(1)
In the datafile, timestamps are 2nd from the right field (first few records
only):
probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
probe-16226f9e,199,835.5,81.2027,1448028162,18.82
probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
.......
The output is:
(1448028163,27.83)
(1448028166,32.06)
(1448028160,30.02)
The contents are correct, but I am not sure about the *order in which they
appear*. Because I am using
val env = StreamExecutionEnvironment.createLocalEnvironment(1) // only one
thread anyway
and the timestamps are guaranteed to be in the *ascending order* (I have
sorted the CSV before using it), my expectation is that the Flink should
print the output as:
(1448028160,30.02)
(1448028163,27.83)
(1448028166,32.06)
How do I explain the randomness?
-- Nirmalya
--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."