This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 30bf5036d [flink] support multiple writers writing to the same
partition when using kafka as logSystem in unaware bucket mode. (#4516)
30bf5036d is described below
commit 30bf5036d6117aedce7297dacec23155a7d5778c
Author: liming.1018 <[email protected]>
AuthorDate: Wed Nov 13 17:04:39 2024 +0800
[flink] support multiple writers writing to the same partition when using
kafka as logSystem in unaware bucket mode. (#4516)
---
.../java/org/apache/paimon/manifest/ManifestCommittable.java | 7 ++++---
paimon-core/src/test/java/org/apache/paimon/TestFileStore.java | 3 ++-
.../paimon/manifest/ManifestCommittableSerializerTest.java | 2 +-
.../main/java/org/apache/paimon/flink/sink/StoreCommitter.java | 10 +++++++++-
.../java/org/apache/paimon/flink/sink/StoreMultiCommitter.java | 10 ++++++----
.../flink/sink/WrappedManifestCommittableSerializerTest.java | 2 +-
6 files changed, 23 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
index 61c4619bd..b4abd0e9e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
@@ -62,13 +62,14 @@ public class ManifestCommittable {
commitMessages.add(commitMessage);
}
- public void addLogOffset(int bucket, long offset) {
- if (logOffsets.containsKey(bucket)) {
+ public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
+ if (!allowDuplicate && logOffsets.containsKey(bucket)) {
throw new RuntimeException(
String.format(
"bucket-%d appears multiple times, which is not
possible.", bucket));
}
- logOffsets.put(bucket, offset);
+ long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset),
offset);
+ logOffsets.put(bucket, newOffset);
}
public long identifier() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 303879337..5218a515a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -222,7 +222,8 @@ public class TestFileStore extends KeyValueFileStore {
null,
Collections.emptyList(),
(commit, committable) -> {
- logOffsets.forEach(committable::addLogOffset);
+ logOffsets.forEach(
+ (bucket, offset) ->
committable.addLogOffset(bucket, offset, false));
commit.commit(committable, Collections.emptyMap());
});
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index c179a2c0a..8de8309bc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -83,7 +83,7 @@ public class ManifestCommittableSerializerTest {
if (!committable.logOffsets().containsKey(bucket)) {
int offset = ID.incrementAndGet();
- committable.addLogOffset(bucket, offset);
+ committable.addLogOffset(bucket, offset, false);
assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index d237f4da5..4908b9931 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.partition.PartitionListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -44,6 +45,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
private final TableCommitImpl commit;
@Nullable private final CommitterMetrics committerMetrics;
private final PartitionListeners partitionListeners;
+ private final boolean allowLogOffsetDuplicate;
public StoreCommitter(FileStoreTable table, TableCommit commit, Context
context) {
this.commit = (TableCommitImpl) commit;
@@ -60,6 +62,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
} catch (Exception e) {
throw new RuntimeException(e);
}
+ allowLogOffsetDuplicate = table.bucketMode() ==
BucketMode.BUCKET_UNAWARE;
}
@VisibleForTesting
@@ -94,7 +97,8 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable)
committable.wrappedCommittable();
- manifestCommittable.addLogOffset(offset.bucket(),
offset.offset());
+ manifestCommittable.addLogOffset(
+ offset.bucket(), offset.offset(),
allowLogOffsetDuplicate);
break;
}
}
@@ -138,6 +142,10 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
partitionListeners.close();
}
+ public boolean allowLogOffsetDuplicate() {
+ return allowLogOffsetDuplicate;
+ }
+
private void calcNumBytesAndRecordsOut(List<ManifestCommittable>
committables) {
if (committerMetrics == null) {
return;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index aeb3e1857..537a98f97 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -92,11 +92,11 @@ public class StoreMultiCommitter
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
for (MultiTableCommittable committable : committables) {
+ Identifier identifier =
+ Identifier.create(committable.getDatabase(),
committable.getTable());
ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
- Identifier.create(committable.getDatabase(),
committable.getTable()),
- checkpointId,
- watermark);
+ identifier, checkpointId, watermark);
switch (committable.kind()) {
case FILE:
@@ -106,7 +106,9 @@ public class StoreMultiCommitter
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable)
committable.wrappedCommittable();
- manifestCommittable.addLogOffset(offset.bucket(),
offset.offset());
+ StoreCommitter committer = tableCommitters.get(identifier);
+ manifestCommittable.addLogOffset(
+ offset.bucket(), offset.offset(),
committer.allowLogOffsetDuplicate());
break;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index 298f3155b..b0aa76f15 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -98,7 +98,7 @@ class WrappedManifestCommittableSerializerTest {
if (!committable.logOffsets().containsKey(bucket)) {
int offset = ID.incrementAndGet();
- committable.addLogOffset(bucket, offset);
+ committable.addLogOffset(bucket, offset, false);
assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
}
}