This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 72e06fc4c4 [flink] Support parallelismConfigured in Non-DynamicBucket
cases (#5924)
72e06fc4c4 is described below
commit 72e06fc4c4ef2e82473443462b4ca85f1b011b09
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Jul 21 13:31:31 2025 +0800
[flink] Support parallelismConfigured in Non-DynamicBucket cases (#5924)
---
.../paimon/flink/utils/ParallelismUtils.java | 5 +
.../paimon/flink/utils/ParallelismUtils.java | 5 +
.../paimon/flink/sink/FlinkStreamPartitioner.java | 8 +-
.../paimon/flink/utils/ParallelismUtils.java | 7 +
.../apache/paimon/flink/ReadWriteTableITCase.java | 2 +-
.../flink/sink/ParallelismConfiguredITCase.java | 280 +++++++++++++++++++++
6 files changed, 305 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
index c57ade1572..da8627e8f2 100644
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
@@ -33,6 +33,11 @@ public class ParallelismUtils {
targetStream.setParallelism(sourceStream.getParallelism());
}
+ public static void forwardParallelism(
+ Transformation<?> targetTransformation, DataStream<?>
sourceStream) {
+ targetTransformation.setParallelism(sourceStream.getParallelism());
+ }
+
public static void setParallelism(
SingleOutputStreamOperator<?> targetStream,
int parallelism,
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
index c57ade1572..da8627e8f2 100644
---
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
@@ -33,6 +33,11 @@ public class ParallelismUtils {
targetStream.setParallelism(sourceStream.getParallelism());
}
+ public static void forwardParallelism(
+ Transformation<?> targetTransformation, DataStream<?>
sourceStream) {
+ targetTransformation.setParallelism(sourceStream.getParallelism());
+ }
+
public static void setParallelism(
SingleOutputStreamOperator<?> targetStream,
int parallelism,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java
index efa2a9d7b7..97c4cba1fc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java
@@ -28,6 +28,8 @@ import
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import static
org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
+
/** A {@link StreamPartitioner} which wraps a {@link ChannelComputer}. */
public class FlinkStreamPartitioner<T> extends StreamPartitioner<T> {
@@ -73,7 +75,11 @@ public class FlinkStreamPartitioner<T> extends
StreamPartitioner<T> {
FlinkStreamPartitioner<T> partitioner = new
FlinkStreamPartitioner<>(channelComputer);
PartitionTransformation<T> partitioned =
new PartitionTransformation<>(input.getTransformation(),
partitioner);
- partitioned.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
+ if (parallelism == null) {
+ forwardParallelism(partitioned, input);
+ } else {
+ partitioned.setParallelism(parallelism);
+ }
return new DataStream<>(input.getExecutionEnvironment(), partitioned);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
index f22408f4c3..4f0b4a1f85 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java
@@ -36,6 +36,13 @@ public class ParallelismUtils {
sourceStream.getTransformation().isParallelismConfigured());
}
+ public static void forwardParallelism(
+ Transformation<?> targetTransformation, DataStream<?>
sourceStream) {
+ targetTransformation.setParallelism(
+ sourceStream.getParallelism(),
+ sourceStream.getTransformation().isParallelismConfigured());
+ }
+
public static void setParallelism(
SingleOutputStreamOperator<?> targetStream,
int parallelism,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index f5e6a797ca..a35bd3025b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1934,7 +1934,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
boolean hasPartitionTransformation = isFixedBucket || hasPrimaryKey;
boolean expectedIsParallelismConfigured =
- (configParallelism != null) || hasPartitionTransformation;
+ (configParallelism != null) || (!isFixedBucket &&
hasPrimaryKey);
Transformation<?> transformation = sink.getTransformation();
boolean isPartitionTransformationFound = true;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ParallelismConfiguredITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ParallelismConfiguredITCase.java
new file mode 100644
index 0000000000..fb485ea3b6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ParallelismConfiguredITCase.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
+import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
+import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for Flink sink's dynamic parallelism inference ability. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class ParallelismConfiguredITCase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ParallelismConfiguredITCase.class);
+
+ private static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("_k", new IntType()),
+ new RowType.RowField("p", new VarCharType(10)),
+ new RowType.RowField("v", new IntType())));
+
+ private static final DataType INPUT_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD("_k", DataTypes.INT()),
+ DataTypes.FIELD("p", DataTypes.STRING()),
+ DataTypes.FIELD("v", DataTypes.INT()));
+
+ private static final int NUM_KEYS = 100;
+
+ @TempDir private static java.nio.file.Path temporaryFolder;
+
+ private final boolean isBatch;
+ private final boolean hasPrimaryKey;
+ private final int numBucket;
+
+ public ParallelismConfiguredITCase(boolean isBatch, boolean hasPrimaryKey,
int numBucket) {
+ this.isBatch = isBatch;
+ this.hasPrimaryKey = hasPrimaryKey;
+ this.numBucket = numBucket;
+ }
+
+ @Parameters(name = "isBatch={0}, hasPrimaryKey={1}, numBucket={2}")
+ public static List<Object[]> getVarSeg() {
+ List<Boolean> isBatchList = Arrays.asList(true, false);
+ List<Boolean> hasPrimaryKeyList = Arrays.asList(true, false);
+ List<Integer> numBucketList = Arrays.asList(-1, 1, 8);
+ List<Object[]> result = new ArrayList<>();
+ for (Boolean isBatch : isBatchList) {
+ for (Boolean hasPrimaryKey : hasPrimaryKeyList) {
+ for (Integer numBucket : numBucketList) {
+ result.add(new Object[] {isBatch, hasPrimaryKey,
numBucket});
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @TestTemplate
+ public void testParallelismConfigurable() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(BUCKET.key(), Integer.toString(numBucket));
+ int[] primaryKey = hasPrimaryKey ? new int[] {0, 1} : new int[] {};
+ if (primaryKey.length == 0 && numBucket > 0) {
+ options.put(BUCKET_KEY.key(), "_k");
+ }
+ options.put(SINK_COMMITTER_OPERATOR_CHAINING.key(), "false");
+ String tempDirPath = new File(temporaryFolder.toFile(),
UUID.randomUUID() + "/").toString();
+ FileStoreTable table = buildFileStoreTable(tempDirPath, new int[] {1},
primaryKey, options);
+
+ Configuration configuration = new Configuration();
+ configuration.set(
+ ExecutionOptions.RUNTIME_MODE,
+ isBatch ? RuntimeExecutionMode.BATCH :
RuntimeExecutionMode.STREAMING);
+
+ configuration.set(DEFAULT_PARALLELISM, 1);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
1);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
1);
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ buildJob(env, table, 1);
+ verifyJobGraph(env, table);
+ env.execute();
+ verifyResult(table, 1);
+
+ LOG.info("restart job with parallelism 3");
+
+ configuration.set(DEFAULT_PARALLELISM, 3);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
3);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
3);
+ env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ buildJob(env, table, 2);
+ verifyJobGraph(env, table);
+ env.execute();
+ verifyResult(table, 2);
+
+ LOG.info("restart job with parallelism 5");
+
+ configuration.set(DEFAULT_PARALLELISM, 5);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
5);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
5);
+ env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ buildJob(env, table, 3);
+ verifyJobGraph(env, table);
+ env.execute();
+ verifyResult(table, 3);
+ }
+
+ private static FileStoreTable buildFileStoreTable(
+ String temporaryPath,
+ int[] partitions,
+ int[] primaryKey,
+ Map<String, String> optionsMap)
+ throws Exception {
+ Options options = new Options();
+ options.set(PATH, temporaryPath);
+ options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
+ for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
+ options.set(entry.getKey(), entry.getValue());
+ }
+ Path tablePath = new CoreOptions(options.toMap()).path();
+ Schema schema =
+ new Schema(
+ toDataType(TABLE_TYPE).getFields(),
+ Arrays.stream(partitions)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ Arrays.stream(primaryKey)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ options.toMap(),
+ "");
+ return retryArtificialException(
+ () -> {
+ new SchemaManager(LocalFileIO.create(),
tablePath).createTable(schema);
+ return FileStoreTableFactory.create(LocalFileIO.create(),
options);
+ });
+ }
+
+ private void buildJob(StreamExecutionEnvironment env, FileStoreTable
table, int round) {
+ DataStream<Row> source =
+ env.fromSequence((long) (round - 1) * NUM_KEYS, (long) round *
NUM_KEYS - 1)
+ .map(
+ x ->
+ Row.of(
+ x.intValue() % NUM_KEYS,
+ String.valueOf(x % NUM_KEYS),
+ x.intValue()));
+ source.getTransformation().setParallelism(source.getParallelism(),
false);
+ new FlinkSinkBuilder(table).forRow(source, INPUT_TYPE).build();
+ }
+
+ private void verifyJobGraph(StreamExecutionEnvironment env, FileStoreTable
table) {
+ for (JobVertex jobVertex :
env.getStreamGraph(false).getJobGraph().getVertices()) {
+ // The following operators should be forced to have single
parallelism, and they should
+ // not be chained with upstream operators to avoid affecting their
ability on
+ // auto-parallelism-inference in AQE.
+ if (jobVertex.getName().startsWith("Global Committer")
+ || jobVertex.getName().startsWith("end: Writer")
+ || jobVertex.getName().startsWith("Compact Coordinator")) {
+ assertThat(jobVertex.isParallelismConfigured())
+ .withFailMessage("Vertex %s should have parallelism
configured", jobVertex)
+ .isTrue();
+ assertThat(jobVertex.getParallelism())
+ .withFailMessage("Vertex %s should have parallelism
1", jobVertex)
+ .isOne();
+ continue;
+ }
+
+ // Dynamic Bucket mode operators does not support
parallelismConfigured.
+ if (BucketMode.HASH_DYNAMIC.equals(table.bucketMode())
+ && isBatch
+ && (jobVertex.getName().contains("Writer")
+ ||
jobVertex.getName().contains("dynamic-bucket-assigner"))) {
+ assertThat(jobVertex.isParallelismConfigured())
+ .withFailMessage("Vertex %s should have parallelism
configured", jobVertex)
+ .isTrue();
+ continue;
+ }
+
+ assertThat(jobVertex.isParallelismConfigured())
+ .withFailMessage("Vertex %s should not have parallelism
configured", jobVertex)
+ .isFalse();
+ }
+ }
+
+ private void verifyResult(FileStoreTable table, int round) throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Row> source =
+ new
FlinkSourceBuilder(table).env(env).sourceBounded(true).buildForRow();
+ List<Row> results = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = source.executeAndCollect()) {
+ while (iterator.hasNext()) {
+ results.add(iterator.next());
+ }
+ }
+ if (hasPrimaryKey) {
+ assertThat(results).hasSize(NUM_KEYS);
+ results.sort(Comparator.comparingInt(x -> x.getFieldAs(0)));
+ for (int i = 0; i < NUM_KEYS; i++) {
+ Row result = results.get(i);
+ assertThat(result.getField(0)).isEqualTo(i);
+ assertThat(result.getField(1)).isEqualTo(Integer.toString(i));
+ assertThat((int) result.getFieldAs(2)).isEqualTo((round - 1) *
NUM_KEYS + i);
+ }
+ } else {
+ assertThat(results).hasSize(NUM_KEYS * round);
+ results.sort(Comparator.comparingInt(x -> x.getFieldAs(2)));
+ for (int i = 0; i < NUM_KEYS * round; i++) {
+ assertThat(results.get(i))
+ .isEqualTo(Row.of(i % NUM_KEYS, String.valueOf(i %
NUM_KEYS), i));
+ }
+ }
+ }
+}