Hi team,

There seems to be a discrepancy where a fix applied to release-1.19.2 is 
missing from release-1.20.0. Specifically, it is commit: 
bfd9b9c31067f51c2f876f9b39b9bf7e83031d90

This commit changes the argument type of checkpointId in: 
org.apache.flink.streaming.api.connector.sink2.CommittableSummary from  Long to 
long.
https://github.com/apache/flink/commit/bfd9b9c31067f51c2f876f9b39b9bf7e83031d90#diff-24c7912c617262cb15b0a582663fccc894d25f37a621b2157377f3f1cab6fd33L41-L73

This change is not in 1.20.0: 
https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java#L52

but is present again in Flink 2.0: 
https://github.com/apache/flink/blame/cc017da9ae12ea58d473d730a84d39440ef928a3/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java#L55C5-L55C5

This is an issue because it breaks the 
org.apache.iceberg.flink.sink.dynamic.DynamicIcebergSink in the Iceberg 
project. It seems that it assumes that checkpointId arg is a long, not a Long. 
The culprit seems to be in this signature: 
https://github.com/apache/iceberg/blob/apache-iceberg-1.10.0/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L97

So when you use it with flink 1.19.1, and 1.20.0 - it fails with: 
java.lang.NoSuchMethodError: 'void 
org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, 
int, long, int, int, int)’. But when you use DynamicIcebergSink with flink 
1.19.2 or 1.19.3 (where the commit is present) it works fine.

Further - this creates a problem for using DynamicIcebergSink on Amazon Managed 
Service for Flink, which only supports either Flink 1.19.1 or Flink 1.20.0.

Please let me know what you think. I’m happy to provide any further information 
is required.

Thank you,

Ilia.

Reply via email to