This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4149e72451 [core] Introduce 'visibility-callback.enabled' (#7235)
4149e72451 is described below
commit 4149e7245155fe04f04e3d047f7077d02d1cacd4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Feb 9 14:52:02 2026 +0800
[core] Introduce 'visibility-callback.enabled' (#7235)
This feature is particularly useful for write-only tables where data
should be queryable immediately after insertion. The callback ensures
that queries wait for the necessary compaction to complete, providing
immediate visibility guarantees.
Configuration Options
- visibility-callback.enabled: Enable/disable the visibility callback
- visibility-callback.timeout: Maximum time to wait for compaction
- visibility-callback.check-interval: Polling interval when checking
compaction status
---
.../shortcodes/generated/core_configuration.html | 18 +++
.../main/java/org/apache/paimon/CoreOptions.java | 36 +++++
.../java/org/apache/paimon/AbstractFileStore.java | 9 ++
.../paimon/iceberg/IcebergCommitCallback.java | 13 +-
.../metastore/AddPartitionCommitCallback.java | 11 +-
.../ChainTableOverwriteCommitCallback.java | 13 +-
.../paimon/metastore/TagPreviewCommitCallback.java | 14 +-
.../paimon/metastore/VisibilityWaitCallback.java | 148 +++++++++++++++++++++
.../paimon/operation/FileStoreCommitImpl.java | 7 +-
.../apache/paimon/table/sink/CommitCallback.java | 29 +++-
.../apache/paimon/table/sink/TableCommitTest.java | 12 +-
.../org/apache/paimon/spark/PaimonCommitTest.scala | 6 +-
.../paimon/spark/sql/VisibilityCallbackTest.scala | 83 ++++++++++++
13 files changed, 337 insertions(+), 62 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2e97f9beef..21cd3f7055 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1452,6 +1452,24 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>String</td>
<td>The Variant shredding schema for writing.</td>
</tr>
+ <tr>
+ <td><h5>visibility-callback.check-interval</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval for checking visibility when visibility-callback
enabled.</td>
+ </tr>
+ <tr>
+ <td><h5>visibility-callback.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable the visibility wait callback that waits for
compaction to complete after commit. This is useful for primary key tables with
deletion vectors or postpone bucket mode to ensure data visibility, only used
for batch mode or bounded stream.</td>
+ </tr>
+ <tr>
+ <td><h5>visibility-callback.timeout</h5></td>
+ <td style="word-wrap: break-word;">30 min</td>
+ <td>Duration</td>
+ <td>The maximum time to wait for compaction to complete when
visibility callback is enabled. If the timeout is reached, an exception will be
thrown.</td>
+ </tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 2b0112b2ad..e9de99e59d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2204,6 +2204,30 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to try upgrading the data files after
overwriting a primary key table.");
+ public static final ConfigOption<Boolean> VISIBILITY_CALLBACK_ENABLED =
+ key("visibility-callback.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable the visibility wait callback
that waits for compaction to complete "
+ + "after commit. This is useful for
primary key tables with deletion vectors or "
+ + "postpone bucket mode to ensure data
visibility, only used for batch mode or bounded stream.");
+
+ public static final ConfigOption<Duration> VISIBILITY_CALLBACK_TIMEOUT =
+ key("visibility-callback.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(30))
+ .withDescription(
+ "The maximum time to wait for compaction to
complete when visibility callback is enabled. "
+ + "If the timeout is reached, an exception
will be thrown.");
+
+ public static final ConfigOption<Duration>
VISIBILITY_CALLBACK_CHECK_INTERVAL =
+ key("visibility-callback.check-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription(
+ "The interval for checking visibility when
visibility-callback enabled.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -3431,6 +3455,18 @@ public class CoreOptions implements Serializable {
return options.get(OVERWRITE_UPGRADE);
}
+ public boolean visibilityCallbackEnabled() {
+ return options.get(VISIBILITY_CALLBACK_ENABLED);
+ }
+
+ public Duration visibilityCallbackTimeout() {
+ return options.get(VISIBILITY_CALLBACK_TIMEOUT);
+ }
+
+ public Duration visibilityCallbackCheckInterval() {
+ return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 3c4ef3168a..1abef11a2c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -39,6 +39,7 @@ import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.ChainTableCommitPreCallback;
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
+import org.apache.paimon.metastore.VisibilityWaitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.Lock;
@@ -54,6 +55,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PartitionModification;
@@ -418,6 +420,13 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
callbacks.add(new ChainTableOverwriteCommitCallback(table));
}
+ if (options.visibilityCallbackEnabled() &&
!schema.primaryKeys().isEmpty()) {
+ if (table.bucketMode() == BucketMode.POSTPONE_MODE
+ || options.deletionVectorsEnabled()) {
+ callbacks.add(new VisibilityWaitCallback(table));
+ }
+ }
+
callbacks.addAll(CallbackUtils.loadCommitCallbacks(options, table));
return callbacks;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index ffd99a30bd..3852c7fb6e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -48,7 +48,6 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.SchemaManager;
@@ -221,16 +220,12 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
public void close() throws Exception {}
@Override
- public void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot) {
+ public void call(Context context) {
createMetadata(
- snapshot,
+ context.snapshot,
(removedFiles, addedFiles) ->
- collectFileChanges(deltaFiles, removedFiles,
addedFiles),
- indexFiles);
+ collectFileChanges(context.deltaFiles, removedFiles,
addedFiles),
+ context.indexFiles);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 5a2fd79ab1..74846f184f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -18,13 +18,10 @@
package org.apache.paimon.metastore;
-import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
@@ -62,13 +59,9 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
@Override
- public void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot) {
+ public void call(Context context) {
Set<BinaryRow> partitions =
- deltaFiles.stream()
+ context.deltaFiles.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
.map(ManifestEntry::partition)
.collect(Collectors.toSet());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
index 08a8ea82d1..2d1a0cd705 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
@@ -19,13 +19,10 @@
package org.apache.paimon.metastore;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitCallback;
@@ -62,17 +59,13 @@ public class ChainTableOverwriteCommitCallback implements
CommitCallback {
}
@Override
- public void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot) {
+ public void call(Context context) {
if (!ChainTableUtils.isScanFallbackDeltaBranch(coreOptions)) {
return;
}
- if (snapshot.commitKind() != CommitKind.OVERWRITE) {
+ if (context.snapshot.commitKind() != CommitKind.OVERWRITE) {
return;
}
@@ -89,7 +82,7 @@ public class ChainTableOverwriteCommitCallback implements
CommitCallback {
coreOptions.legacyPartitionName());
List<BinaryRow> overwritePartitions =
- deltaFiles.stream()
+ context.deltaFiles.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
index afaab4b645..1c74458e52 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -18,15 +18,10 @@
package org.apache.paimon.metastore;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.tag.TagPreview;
-import java.util.List;
import java.util.Optional;
/** A {@link CommitCallback} to add partitions to metastore for tag preview. */
@@ -41,13 +36,10 @@ public class TagPreviewCommitCallback implements
CommitCallback {
}
@Override
- public void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot) {
+ public void call(Context context) {
long currentMillis = System.currentTimeMillis();
- Optional<String> tagOptional = tagPreview.extractTag(currentMillis,
snapshot.watermark());
+ Optional<String> tagOptional =
+ tagPreview.extractTag(currentMillis,
context.snapshot.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/VisibilityWaitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/VisibilityWaitCallback.java
new file mode 100644
index 0000000000..824c232367
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/VisibilityWaitCallback.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** A {@link CommitCallback} to wait for compaction for visibility. */
+public class VisibilityWaitCallback implements CommitCallback {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VisibilityWaitCallback.class);
+
+ private final FileStoreTable table;
+ private final boolean deletionVectorsEnabled;
+ private final Duration timeout;
+ private final Duration checkInterval;
+
+ public VisibilityWaitCallback(FileStoreTable table) {
+ this.table = table;
+ CoreOptions options = table.coreOptions();
+ this.deletionVectorsEnabled = options.deletionVectorsEnabled();
+ this.timeout = options.visibilityCallbackTimeout();
+ this.checkInterval = options.visibilityCallbackCheckInterval();
+ }
+
+ @Override
+ public void call(Context context) {
+ // only work for batch or bounded stream
+ if (context.identifier != BatchWriteBuilder.COMMIT_IDENTIFIER) {
+ return;
+ }
+
+ Set<String> namesToTrack = new HashSet<>();
+ Set<BinaryRow> partitionsToTrack = new HashSet<>();
+ for (ManifestEntry entry : context.deltaFiles) {
+ if (shouldBeTracked(entry)) {
+ namesToTrack.add(entry.fileName());
+ partitionsToTrack.add(entry.partition());
+ }
+ }
+
+ if (namesToTrack.isEmpty()) {
+ return;
+ }
+
+ try {
+ waitForCompaction(context.snapshot, namesToTrack,
partitionsToTrack);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
+ // No-op for retry as the callback is idempotent
+ }
+
+ private void waitForCompaction(
+ Snapshot fromSnapshot, Set<String> namesToTrack, Set<BinaryRow>
partitionsToTrack)
+ throws InterruptedException, TimeoutException {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < timeout.toMillis()) {
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ checkNotNull(latest, "No latest snapshot");
+ if (latest.id() > fromSnapshot.id()
+ && !stillInLatest(latest, namesToTrack,
partitionsToTrack)) {
+ return;
+ }
+ fromSnapshot = latest;
+
+ LOG.info("Waiting for files of table {} to be compacted...",
table.fullName());
+ //noinspection BusyWait
+ Thread.sleep(checkInterval.toMillis());
+ }
+
+ throw new TimeoutException("Timeout waiting for files to be compacted
after " + timeout);
+ }
+
+ private boolean stillInLatest(
+ Snapshot snapshot, Set<String> namesToTrack, Set<BinaryRow>
partitionsToTrack) {
+ Iterator<ManifestEntry> iterator =
+ table.newSnapshotReader()
+ .withSnapshot(snapshot)
+ .withPartitionFilter(new
ArrayList<>(partitionsToTrack))
+ .readFileIterator();
+ while (iterator.hasNext()) {
+ ManifestEntry entry = iterator.next();
+ if (shouldBeTracked(entry) &&
namesToTrack.contains(entry.file().fileName())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean shouldBeTracked(ManifestEntry entry) {
+ if (!FileKind.ADD.equals(entry.kind())) {
+ return false;
+ }
+
+ if (deletionVectorsEnabled && entry.level() == 0) {
+ return true;
+ }
+
+ return entry.bucket() == BucketMode.POSTPONE_BUCKET;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // No resources to close
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7cc0266b43..685b0c23a0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1030,9 +1030,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (strictModeChecker != null) {
strictModeChecker.update(newSnapshotId);
}
- commitCallbacks.forEach(
- callback ->
- callback.call(finalBaseFiles, finalDeltaFiles,
indexFiles, newSnapshot));
+ CommitCallback.Context context =
+ new CommitCallback.Context(
+ finalBaseFiles, finalDeltaFiles, indexFiles,
newSnapshot, identifier);
+ commitCallbacks.forEach(callback -> callback.call(context));
return new SuccessCommitResult();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index e1857c928e..01ef58ef09 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -40,13 +40,32 @@ import java.util.List;
*/
public interface CommitCallback extends AutoCloseable {
- void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot);
+ void call(Context context);
void retry(ManifestCommittable committable);
default void setTable(FileStoreTable table) {}
+
+ /** Context for callback. */
+ class Context {
+
+ public final List<SimpleFileEntry> baseFiles;
+ public final List<ManifestEntry> deltaFiles;
+ public final List<IndexManifestEntry> indexFiles;
+ public final Snapshot snapshot;
+ public final long identifier;
+
+ public Context(
+ List<SimpleFileEntry> baseFiles,
+ List<ManifestEntry> deltaFiles,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot,
+ long identifier) {
+ this.baseFiles = baseFiles;
+ this.deltaFiles = deltaFiles;
+ this.indexFiles = indexFiles;
+ this.snapshot = snapshot;
+ this.identifier = identifier;
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index abdce054e0..3ccf99476a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
@@ -28,10 +27,7 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -192,12 +188,8 @@ public class TableCommitTest {
}
@Override
- public void call(
- List<SimpleFileEntry> baseFiles,
- List<ManifestEntry> deltaFiles,
- List<IndexManifestEntry> indexFiles,
- Snapshot snapshot) {
- commitCallbackResult.get(testId).add(snapshot.commitIdentifier());
+ public void call(Context context) {
+
commitCallbackResult.get(testId).add(context.snapshot.commitIdentifier());
}
@Override
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
index 8b1958d7de..653bd5005d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
@@ -64,11 +64,7 @@ object PaimonCommitTest {
case class CustomCommitCallback(testId: String) extends CommitCallback {
- override def call(
- baseFiles: List[SimpleFileEntry],
- deltaFiles: List[ManifestEntry],
- indexFiles: List[IndexManifestEntry],
- snapshot: Snapshot): Unit = {
+ override def call(context: CommitCallback.Context): Unit = {
PaimonCommitTest.id = testId
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
new file mode 100644
index 0000000000..dc636e3c99
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+
+class VisibilityCallbackTest extends PaimonSparkTestBase {
+
+ Seq((true, false), (false, true)).foreach {
+ case (dv, postpone) =>
+ test(s"Visibility callback with deletion-vectors $dv and postpone-bucket
$postpone") {
+ withTable("T") {
+ val bucket = if (postpone) -2 else 1
+ sql(s"""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '$bucket',
+ | 'primary-key' = 'id',
+ | 'deletion-vectors.enabled' = '$dv',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO T VALUES (1, '1'), (2, '1'), (3, '1')")
+ sql("ALTER TABLE T SET TBLPROPERTIES ('visibility-callback.enabled'
= 'true')")
+ sql("ALTER TABLE T SET TBLPROPERTIES
('visibility-callback.check-interval' = '1s')")
+ if (postpone) {
+ sql("ALTER TABLE T SET TBLPROPERTIES
('postpone.batch-write-fixed-bucket' = 'false')")
+ }
+
+ val table = loadTable("T")
+ val firstSnapshot = table.latestSnapshot().get()
+
+ // Async thread to insert and query
+ val queryResult = Future {
+ // Insert data in async thread
+ sql("INSERT INTO T VALUES (1, '2'), (2, '2'), (3, '2')").collect()
+
+ // Try to query immediately - should wait for compaction due to
visibility callback
+ val result = sql("SELECT * FROM T ORDER BY id").collect()
+ result
+ }
+
+ val compactFuture = Future {
+ while (table.latestSnapshot().get().id == firstSnapshot.id) {
+ println("wait INSERT INTO executed...")
+ Thread.sleep(1000)
+ }
+
+ Thread.sleep(2000)
+ sql("CALL sys.compact('T')")
+ }
+
+ Await.result(compactFuture, 60.seconds)
+ val result = Await.result(queryResult, 60.seconds)
+ Assertions.assertEquals(result.toList, Row(1, "2") :: Row(2, "2") ::
Row(3, "2") :: Nil)
+ }
+ }
+ }
+}