This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 86e53a7e83 Flink: Backport: Dynamic Sink: Document writeParallelism
and fail on invalid configuration (#14758)
86e53a7e83 is described below
commit 86e53a7e83d1604b25535cf8ce34c39e8fd4fa1f
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Dec 4 11:25:41 2025 +0100
Flink: Backport: Dynamic Sink: Document writeParallelism and fail on
invalid configuration (#14758)
backports #14191
---
.../iceberg/flink/sink/dynamic/DynamicRecord.java | 14 +++++++++++
.../flink/sink/dynamic/HashKeyGenerator.java | 22 ++++++++--------
.../flink/sink/dynamic/TestHashKeyGenerator.java | 29 ++++++++++++++++++++++
.../iceberg/flink/sink/dynamic/DynamicRecord.java | 14 +++++++++++
.../flink/sink/dynamic/HashKeyGenerator.java | 22 ++++++++--------
.../flink/sink/dynamic/TestHashKeyGenerator.java | 29 ++++++++++++++++++++++
6 files changed, 110 insertions(+), 20 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
index 600a4d8b95..9f44576608 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
@@ -39,6 +39,20 @@ public class DynamicRecord {
private boolean upsertMode;
@Nullable private Set<String> equalityFields;
+ /**
+ * Constructs a new DynamicRecord.
+ *
+ * @param tableIdentifier The target table identifier.
+ * @param branch The target table branch.
+ * @param schema The target table schema.
+ * @param rowData The data matching the provided schema.
+ * @param partitionSpec The target table {@link PartitionSpec}.
+ * @param distributionMode The {@link DistributionMode}.
+ * @param writeParallelism The number of parallel writers. Can be set to any
value {@literal > 0},
+ * but will always be automatically capped by the maximum write
parallelism, which is the
+ * parallelism of the sink. Set to Integer.MAX_VALUE for always using
the maximum available
+ * write parallelism.
+ */
public DynamicRecord(
TableIdentifier tableIdentifier,
String branch,
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 91aa4a9171..1c611c46b9 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -99,7 +99,7 @@ class HashKeyGenerator {
dynamicRecord.distributionMode(),
DistributionMode.NONE),
MoreObjects.firstNonNull(
dynamicRecord.equalityFields(),
Collections.emptySet()),
- dynamicRecord.writeParallelism()));
+ Math.min(dynamicRecord.writeParallelism(),
maxWriteParallelism)));
try {
return keySelector.getKey(
overrideRowData != null ? overrideRowData : dynamicRecord.rowData());
@@ -242,15 +242,17 @@ class HashKeyGenerator {
String tableName,
int writeParallelism,
int maxWriteParallelism) {
- if (writeParallelism > maxWriteParallelism) {
- LOG.warn(
- "{}: writeParallelism {} is greater than maxWriteParallelism {}.
Capping writeParallelism at {}",
- tableName,
- writeParallelism,
- maxWriteParallelism,
- maxWriteParallelism);
- writeParallelism = maxWriteParallelism;
- }
+ Preconditions.checkArgument(
+ writeParallelism > 0,
+ "%s: writeParallelism must be > 0 (is: %s)",
+ tableName,
+ writeParallelism);
+ Preconditions.checkArgument(
+ writeParallelism <= maxWriteParallelism,
+ "%s: writeParallelism (%s) must be <= maxWriteParallelism (%s)",
+ tableName,
+ writeParallelism,
+ maxWriteParallelism);
this.wrapped = wrapped;
this.writeParallelism = writeParallelism;
this.distinctKeys = new int[writeParallelism];
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 8d559e9206..04246bf039 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Collections;
import java.util.Map;
@@ -157,6 +158,34 @@ class TestHashKeyGenerator {
assertThat(getSubTaskId(writeKey3, writeParallelism,
maxWriteParallelism)).isEqualTo(0);
}
+ @Test
+ void testFailOnNonPositiveWriteParallelism() {
+ final int maxWriteParallelism = 5;
+ HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+
+ assertThatThrownBy(
+ () -> {
+ getWriteKey(
+ generator,
+ PartitionSpec.unpartitioned(),
+ DistributionMode.NONE,
+ -1, // writeParallelism
+ Collections.emptySet(),
+ GenericRowData.of());
+ });
+
+ assertThatThrownBy(
+ () -> {
+ getWriteKey(
+ generator,
+ PartitionSpec.unpartitioned(),
+ DistributionMode.NONE,
+ 0, // writeParallelism
+ Collections.emptySet(),
+ GenericRowData.of());
+ });
+ }
+
@Test
void testCapAtMaxWriteParallelism() throws Exception {
int writeParallelism = 10;
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
index 600a4d8b95..9f44576608 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
@@ -39,6 +39,20 @@ public class DynamicRecord {
private boolean upsertMode;
@Nullable private Set<String> equalityFields;
+ /**
+ * Constructs a new DynamicRecord.
+ *
+ * @param tableIdentifier The target table identifier.
+ * @param branch The target table branch.
+ * @param schema The target table schema.
+ * @param rowData The data matching the provided schema.
+ * @param partitionSpec The target table {@link PartitionSpec}.
+ * @param distributionMode The {@link DistributionMode}.
+ * @param writeParallelism The number of parallel writers. Can be set to any
value {@literal > 0},
+ * but will always be automatically capped by the maximum write
parallelism, which is the
+ * parallelism of the sink. Set to Integer.MAX_VALUE for always using
the maximum available
+ * write parallelism.
+ */
public DynamicRecord(
TableIdentifier tableIdentifier,
String branch,
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 91aa4a9171..1c611c46b9 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -99,7 +99,7 @@ class HashKeyGenerator {
dynamicRecord.distributionMode(),
DistributionMode.NONE),
MoreObjects.firstNonNull(
dynamicRecord.equalityFields(),
Collections.emptySet()),
- dynamicRecord.writeParallelism()));
+ Math.min(dynamicRecord.writeParallelism(),
maxWriteParallelism)));
try {
return keySelector.getKey(
overrideRowData != null ? overrideRowData : dynamicRecord.rowData());
@@ -242,15 +242,17 @@ class HashKeyGenerator {
String tableName,
int writeParallelism,
int maxWriteParallelism) {
- if (writeParallelism > maxWriteParallelism) {
- LOG.warn(
- "{}: writeParallelism {} is greater than maxWriteParallelism {}.
Capping writeParallelism at {}",
- tableName,
- writeParallelism,
- maxWriteParallelism,
- maxWriteParallelism);
- writeParallelism = maxWriteParallelism;
- }
+ Preconditions.checkArgument(
+ writeParallelism > 0,
+ "%s: writeParallelism must be > 0 (is: %s)",
+ tableName,
+ writeParallelism);
+ Preconditions.checkArgument(
+ writeParallelism <= maxWriteParallelism,
+ "%s: writeParallelism (%s) must be <= maxWriteParallelism (%s)",
+ tableName,
+ writeParallelism,
+ maxWriteParallelism);
this.wrapped = wrapped;
this.writeParallelism = writeParallelism;
this.distinctKeys = new int[writeParallelism];
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 8d559e9206..04246bf039 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Collections;
import java.util.Map;
@@ -157,6 +158,34 @@ class TestHashKeyGenerator {
assertThat(getSubTaskId(writeKey3, writeParallelism,
maxWriteParallelism)).isEqualTo(0);
}
+ @Test
+ void testFailOnNonPositiveWriteParallelism() {
+ final int maxWriteParallelism = 5;
+ HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+
+ assertThatThrownBy(
+ () -> {
+ getWriteKey(
+ generator,
+ PartitionSpec.unpartitioned(),
+ DistributionMode.NONE,
+ -1, // writeParallelism
+ Collections.emptySet(),
+ GenericRowData.of());
+ });
+
+ assertThatThrownBy(
+ () -> {
+ getWriteKey(
+ generator,
+ PartitionSpec.unpartitioned(),
+ DistributionMode.NONE,
+ 0, // writeParallelism
+ Collections.emptySet(),
+ GenericRowData.of());
+ });
+ }
+
@Test
void testCapAtMaxWriteParallelism() throws Exception {
int writeParallelism = 10;