This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ca4af649cf [flink] Adopt getTaskInfo() when acquiring parallelism info
(#4583)
ca4af649cf is described below
commit ca4af649cf27f4d280c005cc77bee9c860d50bb1
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 20:11:46 2024 +0800
[flink] Adopt getTaskInfo() when acquiring parallelism info (#4583)
---
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ++++++++++++++++++++++
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ++++++++++++++++++++++
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ++++++++++++++++++++++
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ++++++++++++++++++++++
.../flink/service/QueryExecutorOperator.java | 10 ++++---
.../paimon/flink/sink/CommitterOperator.java | 5 +++-
.../flink/sink/HashBucketAssignerOperator.java | 5 ++--
.../sink/MultiTablesStoreCompactOperator.java | 7 +++--
.../paimon/flink/sink/StoreCompactOperator.java | 7 +++--
.../paimon/flink/sink/TableWriteOperator.java | 5 ++--
.../sink/index/GlobalIndexAssignerOperator.java | 5 ++--
.../flink/sink/index/IndexBootstrapOperator.java | 5 ++--
.../apache/paimon/flink/sorter/SortOperator.java | 4 ++-
.../source/AppendBypassCoordinateOperator.java | 3 +-
.../flink/source/BucketUnawareCompactSource.java | 3 +-
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ++++++++++++++++++++++
.../flink/UnawareBucketAppendOnlyTableITCase.java | 3 +-
17 files changed, 201 insertions(+), 21 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility
issues. */
+public class RuntimeContextUtils {
+ public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+ return context.getNumberOfParallelSubtasks();
+ }
+
+ public static int getIndexOfThisSubtask(RuntimeContext context) {
+ return context.getIndexOfThisSubtask();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility
issues. */
+public class RuntimeContextUtils {
+ public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+ return context.getNumberOfParallelSubtasks();
+ }
+
+ public static int getIndexOfThisSubtask(RuntimeContext context) {
+ return context.getIndexOfThisSubtask();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility
issues. */
+public class RuntimeContextUtils {
+ public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+ return context.getNumberOfParallelSubtasks();
+ }
+
+ public static int getIndexOfThisSubtask(RuntimeContext context) {
+ return context.getIndexOfThisSubtask();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..460fea55ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility
issues. */
+public class RuntimeContextUtils {
+ public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+ return context.getNumberOfParallelSubtasks();
+ }
+
+ public static int getIndexOfThisSubtask(RuntimeContext context) {
+ return context.getIndexOfThisSubtask();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
index 556c308396..bf0521d550 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.service.network.NetworkUtils;
@@ -77,8 +78,8 @@ public class QueryExecutorOperator extends
AbstractStreamOperator<InternalRow>
this.query = ((FileStoreTable)
table).newLocalTableQuery().withIOManager(ioManager);
KvQueryServer server =
new KvQueryServer(
- getRuntimeContext().getIndexOfThisSubtask(),
- getRuntimeContext().getNumberOfParallelSubtasks(),
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
NetworkUtils.findHostAddress(),
Collections.singletonList(0).iterator(),
1,
@@ -96,8 +97,9 @@ public class QueryExecutorOperator extends
AbstractStreamOperator<InternalRow>
this.output.collect(
new StreamRecord<>(
GenericRow.of(
-
getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
+
RuntimeContextUtils.getNumberOfParallelSubtasks(
+ getRuntimeContext()),
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
BinaryString.fromString(address.getHostName()),
address.getPort())));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 2ec90b8c6c..021a5db413 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -129,7 +130,9 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
super.initializeState(context);
Preconditions.checkArgument(
- !forceSingleParallelism ||
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+ !forceSingleParallelism
+ ||
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())
+ == 1,
"Committer Operator parallelism in paimon MUST be one.");
this.currentWatermark = Long.MIN_VALUE;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 70fac7a83e..0c101c6d1e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.index.SimpleHashBucketAssigner;
@@ -76,8 +77,8 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
- int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int taskId = getRuntimeContext().getIndexOfThisSubtask();
+ int numberTasks =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ int taskId =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
this.assigner =
overwrite
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 7cb5d30c2f..8a1d3a02df 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
@@ -109,8 +110,10 @@ public class MultiTablesStoreCompactOperator
ChannelComputer.select(
partition,
bucket,
-
getRuntimeContext().getNumberOfParallelSubtasks())
- ==
getRuntimeContext().getIndexOfThisSubtask());
+
RuntimeContextUtils.getNumberOfParallelSubtasks(
+ getRuntimeContext()))
+ ==
RuntimeContextUtils.getIndexOfThisSubtask(
+ getRuntimeContext()));
tables = new HashMap<>();
writes = new HashMap<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 9b152a81ca..ac10345bc4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
@@ -92,8 +93,10 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
ChannelComputer.select(
partition,
bucket,
-
getRuntimeContext().getNumberOfParallelSubtasks())
- ==
getRuntimeContext().getIndexOfThisSubtask());
+
RuntimeContextUtils.getNumberOfParallelSubtasks(
+ getRuntimeContext()))
+ ==
RuntimeContextUtils.getIndexOfThisSubtask(
+ getRuntimeContext()));
write =
storeSinkWriteProvider.provide(
table,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 67b4720e29..32fcdd03bd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
@@ -58,14 +59,14 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
super.initializeState(context);
boolean containLogSystem = containLogSystem();
- int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int numTasks =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
StateValueFilter stateFilter =
(tableName, partition, bucket) -> {
int task =
containLogSystem
? ChannelComputer.select(bucket, numTasks)
: ChannelComputer.select(partition,
bucket, numTasks);
- return task == getRuntimeContext().getIndexOfThisSubtask();
+ return task ==
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
};
state = createState(context, stateFilter);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 7fee3f45f3..99cce07fdc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.crosspartition.GlobalIndexAssigner;
import org.apache.paimon.crosspartition.KeyPartOrRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.Table;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -59,8 +60,8 @@ public class GlobalIndexAssignerOperator
assigner.open(
computeManagedMemory(this),
ioManager,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
+
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+ RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
index 501e35dff4..5c8ba8f944 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.index;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.crosspartition.KeyPartOrRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -50,8 +51,8 @@ public class IndexBootstrapOperator<T> extends
AbstractStreamOperator<Tuple2<Key
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
bootstrap.bootstrap(
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
+
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+ RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index d4d5dd7416..b6847125fb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.RowType;
@@ -79,7 +80,8 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
public void open() throws Exception {
super.open();
initBuffer();
- if (sinkParallelism !=
getRuntimeContext().getNumberOfParallelSubtasks()) {
+ if (sinkParallelism
+ !=
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())) {
throw new IllegalArgumentException(
"Please ensure that the runtime parallelism of the sink
matches the initial configuration "
+ "to avoid potential issues with skewed range
partitioning.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
index 668aa24c14..45090f7b68 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;
@@ -67,7 +68,7 @@ public class AppendBypassCoordinateOperator<CommitT>
public void open() throws Exception {
super.open();
checkArgument(
- getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
"Compaction Coordinator parallelism in paimon MUST be one.");
long intervalMs =
table.coreOptions().continuousDiscoveryInterval().toMillis();
this.compactTasks = new LinkedBlockingQueue<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index e768c717dd..79ee827fe6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
@@ -87,7 +88,7 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<UnawareAppend
compactionCoordinator =
new UnawareAppendTableCompactionCoordinator(table, streaming,
filter);
Preconditions.checkArgument(
- this.getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
"Compaction Operator parallelism in paimon MUST be one.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
new file mode 100644
index 0000000000..34e0d041b6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** Utility methods about Flink runtime context to resolve compatibility
issues. */
+public class RuntimeContextUtils {
+ public static int getNumberOfParallelSubtasks(RuntimeContext context) {
+ return context.getTaskInfo().getNumberOfParallelSubtasks();
+ }
+
+ public static int getIndexOfThisSubtask(RuntimeContext context) {
+ return context.getTaskInfo().getIndexOfThisSubtask();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index cb323542d4..f6dfb1b230 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReader;
@@ -395,7 +396,7 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception
{
- int taskId = getRuntimeContext().getIndexOfThisSubtask();
+ int taskId =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
// wait some time in parallelism #2,
// so that it does not commit in the same checkpoint with
parallelism #1
int waitCount = (taskId == 0 ? 0 : 10);