This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bd7436179d [flink] Fix StoreMultiCommitter with eager init mode (#5187)
bd7436179d is described below
commit bd7436179d384974bc20e0f5179fe1018327cd59
Author: YeJunHao <[email protected]>
AuthorDate: Fri Feb 28 23:07:03 2025 +0800
[flink] Fix StoreMultiCommitter with eager init mode (#5187)
---
.../java/org/apache/paimon/flink/sink/Committer.java | 18 +++++++++++++++++-
.../apache/paimon/flink/sink/CommitterOperator.java | 7 ++++++-
.../sink/MultiTableCommittableChannelComputer.java | 9 ++++++---
.../apache/paimon/flink/sink/StoreMultiCommitter.java | 14 +++++++++++++-
.../paimon/flink/sink/CommitterOperatorTest.java | 4 +++-
5 files changed, 45 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 81c2f6b007..23c6c7faeb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -81,6 +81,10 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
boolean isRestored();
OperatorStateStore stateStore();
+
+ int getParallelism();
+
+ int getSubtaskIndex();
}
static Context createContext(
@@ -88,7 +92,9 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
@Nullable OperatorMetricGroup metricGroup,
boolean streamingCheckpointEnabled,
boolean isRestored,
- OperatorStateStore stateStore) {
+ OperatorStateStore stateStore,
+ int parallelism,
+ int subtaskIndex) {
return new Committer.Context() {
@Override
public String commitUser() {
@@ -114,6 +120,16 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
public OperatorStateStore stateStore() {
return stateStore;
}
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 383cbcd6eb..4db63b4411 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -130,6 +130,9 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will
never be null
+ int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ int index = getRuntimeContext().getIndexOfThisSubtask();
+
committer =
committerFactory.create(
Committer.createContext(
@@ -137,7 +140,9 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
getMetricGroup(),
streamingCheckpointEnabled,
context.isRestored(),
- context.getOperatorStateStore()));
+ context.getOperatorStateStore(),
+ parallelism,
+ index));
committableStateManager.initializeState(context, committer);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
index 405c6af271..dccc3b84d7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java
@@ -37,9 +37,12 @@ public class MultiTableCommittableChannelComputer
@Override
public int channel(MultiTableCommittable multiTableCommittable) {
- return Math.floorMod(
- Objects.hash(multiTableCommittable.getDatabase(),
multiTableCommittable.getTable()),
- numChannels);
+ return computeChannel(
+ multiTableCommittable.getDatabase(),
multiTableCommittable.getTable(), numChannels);
+ }
+
+ public static int computeChannel(String database, String table, int
numChannels) {
+ return Math.floorMod(Objects.hash(database, table), numChannels);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 67b2b6bd46..8ad3e4fb08 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -85,9 +85,21 @@ public class StoreMultiCommitter
this.tableCommitters = new HashMap<>();
this.tableFilter = tableFilter;
+ int parallelism = context.getParallelism();
+ int index = context.getSubtaskIndex();
if (eagerInit) {
- List<Identifier> tableIds = filterTables();
+ List<Identifier> tableIds =
+ filterTables().stream()
+ .filter(
+ identifier ->
+
MultiTableCommittableChannelComputer.computeChannel(
+
identifier.getDatabaseName(),
+
identifier.getTableName(),
+ parallelism)
+ == index)
+ .collect(Collectors.toList());
+
tableIds.stream().forEach(this::getStoreCommitter);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 1981abd373..9d3bc135e2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -597,7 +597,9 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
OperatorMetricGroup metricGroup =
UnregisteredMetricsGroup.createOperatorMetricGroup();
StoreCommitter committer =
new StoreCommitter(
- table, commit, Committer.createContext("",
metricGroup, true, false, null));
+ table,
+ commit,
+ Committer.createContext("", metricGroup, true, false,
null, 1, 1));
committer.commit(Collections.singletonList(manifestCommittable));
CommitterMetrics metrics = committer.getCommitterMetrics();
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);