Re: GroupIntoBatches not working on Flink?

2022-07-27 Thread Moritz Mack
I noticed there’s also a similar bug open for the Spark runner
https://github.com/apache/beam/issues/21378

Problem seems to be in SimpleDoFnRunner.TimerInternalsTimer#clear(), which 
doesn’t work with InMemoryTimerInternals (anymore).
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1143-L1145

On 26.07.22, 20:18, "Reuven Lax via user"  wrote:

This might be a bug in the Flink runner, because it is implemented here.⁠​ On 
Tue, Jul 26, 2022 at 9:⁠​14 AM Cristian Constantinescu  
wrote:⁠​ Hi everyone, Quick question about GroupIntoBatches.⁠​ When running on 
Flink, eventually it

This might be a bug in the Flink runner, because it is implemented 
here.

On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu 
mailto:zei...@gmail.com>> wrote:
Hi everyone,

Quick question about GroupIntoBatches.

When running on Flink, eventually it hits an unsupported exception "Canceling a 
timer by ID is not yet supported." on this line [1]. The source inputs are AVRO 
files for testing (batch) but will use kafka topics (streaming) when deployed.

This happens when the batch is filled (10 items) and the max buffering time 
timer needs to be cancelled.

Anyone else observed this issue?

On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses 
InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I would 
guess there's a difference between batch and streaming requirements?

Thank you,
Cristian


[1] 
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157
[2] 
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136
[3] 
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. 




Re: GroupIntoBatches not working on Flink?

2022-07-26 Thread Reuven Lax via user
This might be a bug in the Flink runner, because it is implemented here

.

On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> Quick question about GroupIntoBatches.
>
> When running on Flink, eventually it hits an unsupported
> exception "Canceling a timer by ID is not yet supported." on this line [1].
> The source inputs are AVRO files for testing (batch) but will use kafka
> topics (streaming) when deployed.
>
> This happens when the batch is filled (10 items) and the max buffering
> time timer needs to be cancelled.
>
> Anyone else observed this issue?
>
> On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses
> InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I
> would guess there's a difference between batch and streaming requirements?
>
> Thank you,
> Cristian
>
>
> [1]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157
> [2]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136
> [3]
> https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314
>


GroupIntoBatches not working on Flink?

2022-07-26 Thread Cristian Constantinescu
Hi everyone,

Quick question about GroupIntoBatches.

When running on Flink, eventually it hits an unsupported
exception "Canceling a timer by ID is not yet supported." on this line [1].
The source inputs are AVRO files for testing (batch) but will use kafka
topics (streaming) when deployed.

This happens when the batch is filled (10 items) and the max buffering time
timer needs to be cancelled.

Anyone else observed this issue?

On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses
InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I
would guess there's a difference between batch and streaming requirements?

Thank you,
Cristian


[1]
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157
[2]
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136
[3]
https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314