This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6ca4009297 Flink: Backport clear globalStatisticsState in init to
avoid duplication (#14315)
6ca4009297 is described below
commit 6ca4009297483eaf6a66c865bb0498d778000b1a
Author: GuoYu <[email protected]>
AuthorDate: Mon Oct 13 19:25:24 2025 +0800
Flink: Backport clear globalStatisticsState in init to avoid duplication
(#14315)
Backports #14294
---
.../apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java | 8 ++++++++
.../iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java | 6 ++++++
.../apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java | 8 ++++++++
.../iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java | 6 ++++++
4 files changed, 28 insertions(+)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
index 7995a8a5b1..945deeec27 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
@@ -119,6 +119,9 @@ public class DataStatisticsOperator extends
AbstractStreamOperator<StatisticsOrR
this.globalStatistics = restoredStatistics;
}
+ // Perform a cleanup first to ensure that the state is empty.
+ globalStatisticsState.clear();
+
// Always request for new statistics from coordinator upon task
initialization.
// There are a few scenarios this is needed
// 1. downstream writer parallelism changed due to rescale.
@@ -263,4 +266,9 @@ public class DataStatisticsOperator extends
AbstractStreamOperator<StatisticsOrR
GlobalStatistics globalStatistics() {
return globalStatistics;
}
+
+ @VisibleForTesting
+ ListState<GlobalStatistics> globalStatisticsState() {
+ return globalStatisticsState;
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
index f7a7a147e7..da0a498da9 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
@@ -250,6 +250,12 @@ public class TestDataStatisticsOperator {
testHarness2.setup();
testHarness2.initializeState(snapshot);
+ // When we restore from the savepoint, we should ensure that
`globalStatisticsState` has been
+ // completely cleaned up
+ Iterable<GlobalStatistics> globalStatisticsIterable =
+ restoredOperator.globalStatisticsState().get();
+ assertThat(globalStatisticsIterable).isEmpty();
+
GlobalStatistics globalStatistics = restoredOperator.globalStatistics();
// global statistics is always restored and used initially even if
// downstream parallelism changed.
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
index a873136c91..a481794db7 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
@@ -122,6 +122,9 @@ public class DataStatisticsOperator extends
AbstractStreamOperator<StatisticsOrR
this.globalStatistics = restoredStatistics;
}
+ // Perform a cleanup first to ensure that the state is empty.
+ globalStatisticsState.clear();
+
// Always request for new statistics from coordinator upon task
initialization.
// There are a few scenarios this is needed
// 1. downstream writer parallelism changed due to rescale.
@@ -266,4 +269,9 @@ public class DataStatisticsOperator extends
AbstractStreamOperator<StatisticsOrR
GlobalStatistics globalStatistics() {
return globalStatistics;
}
+
+ @VisibleForTesting
+ ListState<GlobalStatistics> globalStatisticsState() {
+ return globalStatisticsState;
+ }
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
index 09b2b6371e..408309b2b9 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
@@ -242,6 +242,12 @@ public class TestDataStatisticsOperator {
testHarness2.setup();
testHarness2.initializeState(snapshot);
+ // When we restore from the savepoint, we should ensure that
`globalStatisticsState` has been
+ // completely cleaned up
+ Iterable<GlobalStatistics> globalStatisticsIterable =
+ restoredOperator.globalStatisticsState().get();
+ assertThat(globalStatisticsIterable).isEmpty();
+
GlobalStatistics globalStatistics = restoredOperator.globalStatistics();
// global statistics is always restored and used initially even if
// downstream parallelism changed.