Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Niels Basjes
Thanks guys,

It is clear this is a Java thing.

Niels

On Wed, Jul 8, 2020 at 9:56 AM Tzu-Li (Gordon) Tai 
wrote:

> Ah, didn't realize Chesnay has it answered already, sorry for the
> concurrent
> reply :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi,

This would be more of a Java question.
In short, type inference of generic types does not work for chained
invocations, and therefore type information has to be explicitly included.

If you'd like to chain the calls, this would work:

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner) (element,
recordTimestamp) -> 42L);

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Chesnay Schepler
WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1, 
ChronoUnit.MINUTES)) returns a WatermarkStrategy, but the exact type 
is entirely dependent on the variable declaration (i.e., it is not 
dependent on any argument).


So, when you assign the strategy to a variable then the compiler can 
infer the generic type. Without a variable it is treated as a 
WatermarkStrategy, because there is nothing to infer the type from.


On 08/07/2020 08:54, Niels Basjes wrote:

Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I 
find strange.


This works

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for 
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)


Why is that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes





Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Niels Basjes
Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I find
strange.

This works

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy
.withTimestampAssigner((SerializableTimestampAssigner)
(element, recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner)
(element, recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
is not applicable
  (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
cannot be converted to
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
is not applicable
  (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
cannot be converted to
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)

Why is that?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes