Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
Thank you for your response. Registering a timer at Long.MaxValue works.
And I have found the mistake in my original code.

When a timer fires and there are elements in the priority queue with
timestamp greater than current watermark, they do not get processed. A new
timer should be registered for these elements. I just forgot theses
unprocessed elements.

Dian Fu  于 2019年10月28日周一 下午4:17写道:

> Before a program close, it will emit Long.MaxValue as the watermark and
> that watermark will trigger all the windows. This is the reason why your
> `timeWindow` program could work. However, for the first program, you have
> not registered the event time timer(though context.timerService.
> registerEventTimeTimer) and also there is also no onTimer logic defined
> to process it.
>
> 在 2019年10月28日,下午4:01,杨力  写道:
>
> It seems to be the case. But when I use timeWindow or CEP with
> fromCollection, it works well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002,
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as
> expected.
>
> Dian Fu  于 2019年10月28日周一 下午2:04写道:
>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying
>> implementation of `assignAscendingTimestamps`. So for your test program,
>> the watermark is still not generated yet and I think that's the reason why
>> it's Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力  写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at
>> (currentWatermark + 1), following the instructions in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>> .
>>
>> However, it seems that context.timerService().currentWatermark() always
>> returns Long.MinValue and my onTimer will never be called. Here's minimal
>> program to reproduce the problem. Am I missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2,
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long,
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread David Anderson
The reason why the watermark is not advancing is that
assignAscendingTimestamps is a periodic watermark generator. This
style of watermark generator is called at regular intervals to create
watermarks -- by default, this is done every 200 msec. With only a
tiny bit of data to process, the job doesn't run long enough for the
watermark generator to ever be called.


On Mon, Oct 28, 2019 at 9:17 AM Dian Fu  wrote:
>
> Before a program close, it will emit Long.MaxValue as the watermark and that 
> watermark will trigger all the windows. This is the reason why your 
> `timeWindow` program could work. However, for the first program, you have not 
> registered the event time timer(though 
> context.timerService.registerEventTimeTimer) and also there is also no 
> onTimer logic defined to process it.
>
> 在 2019年10月28日,下午4:01,杨力  写道:
>
> It seems to be the case. But when I use timeWindow or CEP with 
> fromCollection, it works well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002, 
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as expected.
>
> Dian Fu  于 2019年10月28日周一 下午2:04写道:
>>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying 
>> implementation of `assignAscendingTimestamps`. So for your test program, the 
>> watermark is still not generated yet and I think that's the reason why it's 
>> Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力  写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at 
>> (currentWatermark + 1), following the instructions in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing.
>>
>> However, it seems that context.timerService().currentWatermark() always 
>> returns Long.MinValue and my onTimer will never be called. Here's minimal 
>> program to reproduce the problem. Am I missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2, 
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long, 
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread Dian Fu
Before a program close, it will emit Long.MaxValue as the watermark and that 
watermark will trigger all the windows. This is the reason why your 
`timeWindow` program could work. However, for the first program, you have not 
registered the event time timer(though 
context.timerService.registerEventTimeTimer) and also there is also no onTimer 
logic defined to process it.

> 在 2019年10月28日,下午4:01,杨力  写道:
> 
> It seems to be the case. But when I use timeWindow or CEP with 
> fromCollection, it works well. For example,
> 
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002, 
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
> 
> prints
> 
> ```
> 1
> 1002
> 2002
> 3002
> ```
> 
> How can I implement my KeyedProcessFunction so that it would work as expected.
> 
> Dian Fu mailto:dian0511...@gmail.com>> 于 
> 2019年10月28日周一 下午2:04写道:
> Hi,
> 
> It generates watermark periodically by default in the underlying 
> implementation of `assignAscendingTimestamps`. So for your test program, the 
> watermark is still not generated yet and I think that's the reason why it's 
> Long.MinValue. 
> 
> Regards,
> Dian  
> 
>> 在 2019年10月28日,上午11:59,杨力 > > 写道:
>> 
>> I'm going to sort elements in a PriorityQueue and set up timers at 
>> (currentWatermark + 1), following the instructions in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>>  
>> .
>> 
>> However, it seems that context.timerService().currentWatermark() always 
>> returns Long.MinValue and my onTimer will never be called. Here's minimal 
>> program to reproduce the problem. Am I missing something?
>> 
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2, 
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long, 
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>> 
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>> 
> 



Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
It seems to be the case. But when I use timeWindow or CEP with
fromCollection, it works well. For example,

```
sEnv.fromCollection(Seq[Long](1, 1002, 2002,
3002)).assignAscendingTimestamps(identity[Long])
.keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
```

prints

```
1
1002
2002
3002
```

How can I implement my KeyedProcessFunction so that it would work as
expected.

Dian Fu  于 2019年10月28日周一 下午2:04写道:

> Hi,
>
> It generates watermark periodically by default in the underlying
> implementation of `assignAscendingTimestamps`. So for your test program,
> the watermark is still not generated yet and I think that's the reason why
> it's Long.MinValue.
>
> Regards,
> Dian
>
> 在 2019年10月28日,上午11:59,杨力  写道:
>
> I'm going to sort elements in a PriorityQueue and set up timers at
> (currentWatermark + 1), following the instructions in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
> .
>
> However, it seems that context.timerService().currentWatermark() always
> returns Long.MinValue and my onTimer will never be called. Here's minimal
> program to reproduce the problem. Am I missing something?
>
> ```
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> sEnv.setParallelism(argOps.parallelism())
> sEnv.fromCollection(Seq[Long](1, 2,
> 3)).assignAscendingTimestamps(identity[Long])
> .process(new ProcessFunction[Long, Long] {
>   override def processElement(i: Long, context: ProcessFunction[Long,
> Long]#Context, collector: Collector[Long]): Unit = {
> collector.collect(context.timerService().currentWatermark())
>   }
> }).print()
> sEnv.execute()
> ```
>
> ```
> -9223372036854775808
> -9223372036854775808
> -9223372036854775808
> ```
>
>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread Dian Fu
Hi,

It generates watermark periodically by default in the underlying implementation 
of `assignAscendingTimestamps`. So for your test program, the watermark is 
still not generated yet and I think that's the reason why it's Long.MinValue. 

Regards,
Dian  

> 在 2019年10月28日,上午11:59,杨力  写道:
> 
> I'm going to sort elements in a PriorityQueue and set up timers at 
> (currentWatermark + 1), following the instructions in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>  
> .
> 
> However, it seems that context.timerService().currentWatermark() always 
> returns Long.MinValue and my onTimer will never be called. Here's minimal 
> program to reproduce the problem. Am I missing something?
> 
> ```
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> sEnv.setParallelism(argOps.parallelism())
> sEnv.fromCollection(Seq[Long](1, 2, 
> 3)).assignAscendingTimestamps(identity[Long])
> .process(new ProcessFunction[Long, Long] {
>   override def processElement(i: Long, context: ProcessFunction[Long, 
> Long]#Context, collector: Collector[Long]): Unit = {
> collector.collect(context.timerService().currentWatermark())
>   }
> }).print()
> sEnv.execute()
> ```
> 
> ```
> -9223372036854775808
> -9223372036854775808
> -9223372036854775808
> ```
>