Hi,
I have a flink streaming application and I want to count records received
per second (as a way of measuring the throughput of my application).
However, I am using the EventTime time characteristic, as follows:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val s = env.socketTextStream("localhost", 1234)
s.map(line =>
Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
+
System.currentTimeMillis())
val mainStrean = s.map(line => {
val Array(p1, p2) = line.split(" ")
(p1, p2.toInt)
})
.assignAscendingTimestamps(p => System.currentTimeMillis())
which naturally gives me this error:
[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
timestamp (= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?
How can I do this?
Thanks.