I'm going to sort elements in a PriorityQueue and set up timers at
(currentWatermark + 1), following the instructions in
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
.

However, it seems that context.timerService().currentWatermark() always
returns Long.MinValue and my onTimer will never be called. Here's minimal
program to reproduce the problem. Am I missing something?

```
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
sEnv.setParallelism(argOps.parallelism())
sEnv.fromCollection(Seq[Long](1, 2,
3)).assignAscendingTimestamps(identity[Long])
    .process(new ProcessFunction[Long, Long] {
      override def processElement(i: Long, context: ProcessFunction[Long,
Long]#Context, collector: Collector[Long]): Unit = {
        collector.collect(context.timerService().currentWatermark())
      }
    }).print()
sEnv.execute()
```

```
-9223372036854775808
-9223372036854775808
-9223372036854775808
```

Reply via email to