It seems to be the case. But when I use timeWindow or CEP with
fromCollection, it works well. For example,

```
sEnv.fromCollection(Seq[Long](1, 1002, 2002,
3002)).assignAscendingTimestamps(identity[Long])
    .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
```

prints

```
1
1002
2002
3002
```

How can I implement my KeyedProcessFunction so that it would work as
expected.

Dian Fu <dian0511...@gmail.com> 于 2019年10月28日周一 下午2:04写道:

> Hi,
>
> It generates watermark periodically by default in the underlying
> implementation of `assignAscendingTimestamps`. So for your test program,
> the watermark is still not generated yet and I think that's the reason why
> it's Long.MinValue.
>
> Regards,
> Dian
>
> 在 2019年10月28日,上午11:59,杨力 <bill.le...@gmail.com> 写道:
>
> I'm going to sort elements in a PriorityQueue and set up timers at
> (currentWatermark + 1), following the instructions in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
> .
>
> However, it seems that context.timerService().currentWatermark() always
> returns Long.MinValue and my onTimer will never be called. Here's minimal
> program to reproduce the problem. Am I missing something?
>
> ```
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> sEnv.setParallelism(argOps.parallelism())
> sEnv.fromCollection(Seq[Long](1, 2,
> 3)).assignAscendingTimestamps(identity[Long])
>     .process(new ProcessFunction[Long, Long] {
>       override def processElement(i: Long, context: ProcessFunction[Long,
> Long]#Context, collector: Collector[Long]): Unit = {
>         collector.collect(context.timerService().currentWatermark())
>       }
>     }).print()
> sEnv.execute()
> ```
>
> ```
> -9223372036854775808
> -9223372036854775808
> -9223372036854775808
> ```
>
>
>

Reply via email to