This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e2200201 [core] Fix bug that changelog may not be produced when 
setting changelog-producer.lookup-wait to false (#2885)
5e2200201 is described below

commit 5e2200201fa72ba420a0abf14160e7ef98f2a8e1
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 22 11:37:46 2024 +0800

    [core] Fix bug that changelog may not be produced when setting 
changelog-producer.lookup-wait to false (#2885)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  5 ++
 .../paimon/compact/CompactFutureManager.java       |  5 ++
 .../org/apache/paimon/compact/CompactManager.java  |  3 ++
 .../apache/paimon/compact/NoopCompactManager.java  |  5 ++
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  5 ++
 .../paimon/operation/AbstractFileStoreWrite.java   |  6 ++-
 .../java/org/apache/paimon/utils/RecordWriter.java |  3 ++
 .../apache/paimon/table/sink/TableWriteTest.java   | 63 ++++++++++++++++++++--
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  6 ++-
 9 files changed, 96 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 302ae129c..a2f614291 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -172,6 +172,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         return increment;
     }
 
+    @Override
+    public boolean isCompacting() {
+        return compactManager.isCompacting();
+    }
+
     private void flush(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
             throws Exception {
         long start = System.currentTimeMillis();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
index c8a70e427..7a8dc8da0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
@@ -44,6 +44,11 @@ public abstract class CompactFutureManager implements 
CompactManager {
         }
     }
 
+    @Override
+    public boolean isCompacting() {
+        return taskFuture != null;
+    }
+
     protected final Optional<CompactResult> innerGetCompactionResult(boolean 
blocking)
             throws ExecutionException, InterruptedException {
         if (taskFuture != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
index c8c89d3c3..88897bd86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
@@ -51,4 +51,7 @@ public interface CompactManager extends Closeable {
 
     /** Cancel currently running compaction task. */
     void cancelCompaction();
+
+    /** Check if a compaction is in progress, or if a compaction result 
remains to be fetched. */
+    boolean isCompacting();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
index bf1b2c79c..6f84105f3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
@@ -70,6 +70,11 @@ public class NoopCompactManager implements CompactManager {
     @Override
     public void cancelCompaction() {}
 
+    @Override
+    public boolean isCompacting() {
+        return false;
+    }
+
     @Override
     public void close() throws IOException {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 055ffe7d5..d12796c74 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -258,6 +258,11 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         return increment;
     }
 
+    @Override
+    public boolean isCompacting() {
+        return compactManager.isCompacting();
+    }
+
     @Override
     public void sync() throws Exception {
         trySyncLatestCompaction(true);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index da7fba046..7b119b858 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -211,7 +211,11 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 result.add(committable);
 
                 if (committable.isEmpty()) {
-                    if (writerContainer.lastModifiedCommitIdentifier <= 
latestCommittedIdentifier) {
+                    // Condition 1: There is no more record waiting to be 
committed.
+                    // Condition 2: No compaction is in progress. That is, no 
more changelog will be
+                    // produced.
+                    if (writerContainer.lastModifiedCommitIdentifier <= 
latestCommittedIdentifier
+                            && !writerContainer.writer.isCompacting()) {
                         // Clear writer if no update, and if its latest 
modification has committed.
                         //
                         // We need a mechanism to clear writers, otherwise 
there will be more and
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
index 6f46bc58e..d2df1be46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
@@ -61,6 +61,9 @@ public interface RecordWriter<T> {
      */
     CommitIncrement prepareCommit(boolean waitCompaction) throws Exception;
 
+    /** Check if a compaction is in progress, or if a compaction result 
remains to be fetched. */
+    boolean isCompacting();
+
     /**
      * Sync the writer. The structure related to file reading and writing is 
thread unsafe, there
      * are asynchronous threads inside the writer, which should be synced 
before reading data.
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index aec89ab4c..ccbade9ad 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -21,11 +21,13 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -33,6 +35,8 @@ import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -105,7 +109,12 @@ public class TableWriteTest {
             eventList.add(random.nextInt(eventList.size() + 1), 
Event.EXTRACT_STATE);
         }
 
-        FileStoreTable table = createFileStoreTable();
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+        FileStoreTable table = createFileStoreTable(conf);
+
         TableWriteImpl<?> write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
@@ -168,12 +177,60 @@ public class TableWriteTest {
         EXTRACT_STATE
     }
 
-    private FileStoreTable createFileStoreTable() throws Exception {
+    @Test
+    public void testChangelogWhenNotWaitForCompaction() throws Exception {
         Options conf = new Options();
-        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.BUCKET, 1);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.LOOKUP);
+        FileStoreTable table = createFileStoreTable(conf);
+
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        int numPartitions = 5;
+        int numRecordsPerPartition = 10000;
+
+        int commitIdentifier = 0;
+        for (int i = 0; i < numPartitions; i++) {
+            // Write a lot of records, then quickly call prepareCommit many 
times, without waiting
+            // for compaction.
+            // It is very likely that the compaction is not finished.
+            for (int j = 0; j < numRecordsPerPartition; j++) {
+                write.write(GenericRow.of(i, j, (long) j));
+            }
+            for (int j = 0; j < 3; j++) {
+                commitIdentifier++;
+                // Even if there is no new record, if there is an ongoing 
compaction, the
+                // corresponding writer should not be closed.
+                commit.commit(commitIdentifier, write.prepareCommit(false, 
commitIdentifier));
+            }
+        }
+        commit.commit(commitIdentifier, write.prepareCommit(true, 
commitIdentifier));
+
+        write.close();
+        commit.close();
+
+        Map<String, String> readOptions = new HashMap<>();
+        readOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+        table = table.copy(readOptions);
+        long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+
+        StreamTableScan scan = table.newStreamScan();
+        TableRead read = table.newRead();
+        List<InternalRow> actual = new ArrayList<>();
+        for (long i = 0; i <= latestSnapshotId; i++) {
+            RecordReader<InternalRow> reader = 
read.createReader(scan.plan().splits());
+            reader.forEachRemaining(actual::add);
+            reader.close();
+        }
+
+        assertThat(actual).hasSize(numPartitions * numRecordsPerPartition);
+    }
 
+    private FileStoreTable createFileStoreTable(Options conf) throws Exception 
{
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(
                         new SchemaManager(LocalFileIO.create(), tablePath),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 8b4f452ef..713bcc341 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -379,7 +379,9 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + String.format(
                                 "'write-buffer-size' = '%s',",
                                 random.nextBoolean() ? "512kb" : "1mb")
-                        + "'changelog-producer' = 'lookup'");
+                        + "'changelog-producer' = 'lookup',"
+                        + String.format(
+                                "'changelog-producer.lookup-wait' = '%s'", 
random.nextBoolean()));
 
         // sleep for a random amount of time to check
         // if we can first read complete records then read incremental records 
correctly
@@ -436,6 +438,8 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 "'write-buffer-size' = '%s',",
                                 random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'lookup',"
+                        + String.format(
+                                "'changelog-producer.lookup-wait' = '%s',", 
random.nextBoolean())
                         + "'write-only' = 'true'");
 
         // sleep for a random amount of time to check

Reply via email to