Hello,
I have a very simple stream where I window data using event-time.
As a data source I’m using a CSV file, sorted by increasing timestamps.
Here’s the source:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val lines = env.readTextFile(csvFileName)
lines
.flatMap { l => parseLine(l) }
.assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
.keyBy(t => t.key)
.timeWindow(Time.minutes(30), Time.minutes(5))
.fold(0)((c, _) => c+1)
.addSink { c =>
println(c)
}
env.execute()
This used to work fine in 1.0.3, that is the aggregate counts are printed to
stdout.
However after updating to 1.1, nothing happens - I can see the stages being
initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then
immediately going to FINISHED, without printing anything out.
If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see
the data flowing - so data *is* being read, just somehow the windowing causes
it to be lost?
Any ideas on where to look for possible causes?
Thanks!
--
Adam Warski
http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>