This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5e2200201 [core] Fix bug that changelog may not be produced when
setting changelog-producer.lookup-wait to false (#2885)
5e2200201 is described below
commit 5e2200201fa72ba420a0abf14160e7ef98f2a8e1
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 22 11:37:46 2024 +0800
[core] Fix bug that changelog may not be produced when setting
changelog-producer.lookup-wait to false (#2885)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 5 ++
.../paimon/compact/CompactFutureManager.java | 5 ++
.../org/apache/paimon/compact/CompactManager.java | 3 ++
.../apache/paimon/compact/NoopCompactManager.java | 5 ++
.../apache/paimon/mergetree/MergeTreeWriter.java | 5 ++
.../paimon/operation/AbstractFileStoreWrite.java | 6 ++-
.../java/org/apache/paimon/utils/RecordWriter.java | 3 ++
.../apache/paimon/table/sink/TableWriteTest.java | 63 ++++++++++++++++++++--
.../flink/PrimaryKeyFileStoreTableITCase.java | 6 ++-
9 files changed, 96 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 302ae129c..a2f614291 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -172,6 +172,11 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
return increment;
}
+ @Override
+ public boolean isCompacting() {
+ return compactManager.isCompacting();
+ }
+
private void flush(boolean waitForLatestCompaction, boolean
forcedFullCompaction)
throws Exception {
long start = System.currentTimeMillis();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
index c8a70e427..7a8dc8da0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
@@ -44,6 +44,11 @@ public abstract class CompactFutureManager implements
CompactManager {
}
}
+ @Override
+ public boolean isCompacting() {
+ return taskFuture != null;
+ }
+
protected final Optional<CompactResult> innerGetCompactionResult(boolean
blocking)
throws ExecutionException, InterruptedException {
if (taskFuture != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
index c8c89d3c3..88897bd86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
@@ -51,4 +51,7 @@ public interface CompactManager extends Closeable {
/** Cancel currently running compaction task. */
void cancelCompaction();
+
+ /** Check if a compaction is in progress, or if a compaction result
remains to be fetched. */
+ boolean isCompacting();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
index bf1b2c79c..6f84105f3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
@@ -70,6 +70,11 @@ public class NoopCompactManager implements CompactManager {
@Override
public void cancelCompaction() {}
+ @Override
+ public boolean isCompacting() {
+ return false;
+ }
+
@Override
public void close() throws IOException {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 055ffe7d5..d12796c74 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -258,6 +258,11 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
return increment;
}
+ @Override
+ public boolean isCompacting() {
+ return compactManager.isCompacting();
+ }
+
@Override
public void sync() throws Exception {
trySyncLatestCompaction(true);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index da7fba046..7b119b858 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -211,7 +211,11 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
result.add(committable);
if (committable.isEmpty()) {
- if (writerContainer.lastModifiedCommitIdentifier <=
latestCommittedIdentifier) {
+ // Condition 1: There is no more record waiting to be
committed.
+ // Condition 2: No compaction is in progress. That is, no
more changelog will be
+ // produced.
+ if (writerContainer.lastModifiedCommitIdentifier <=
latestCommittedIdentifier
+ && !writerContainer.writer.isCompacting()) {
// Clear writer if no update, and if its latest
modification has committed.
//
// We need a mechanism to clear writers, otherwise
there will be more and
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
index 6f46bc58e..d2df1be46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
@@ -61,6 +61,9 @@ public interface RecordWriter<T> {
*/
CommitIncrement prepareCommit(boolean waitCompaction) throws Exception;
+ /** Check if a compaction is in progress, or if a compaction result
remains to be fetched. */
+ boolean isCompacting();
+
/**
* Sync the writer. The structure related to file reading and writing is
thread unsafe, there
* are asynchronous threads inside the writer, which should be synced
before reading data.
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index aec89ab4c..ccbade9ad 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -21,11 +21,13 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -33,6 +35,8 @@ import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -105,7 +109,12 @@ public class TableWriteTest {
eventList.add(random.nextInt(eventList.size() + 1),
Event.EXTRACT_STATE);
}
- FileStoreTable table = createFileStoreTable();
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ FileStoreTable table = createFileStoreTable(conf);
+
TableWriteImpl<?> write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -168,12 +177,60 @@ public class TableWriteTest {
EXTRACT_STATE
}
- private FileStoreTable createFileStoreTable() throws Exception {
+ @Test
+ public void testChangelogWhenNotWaitForCompaction() throws Exception {
Options conf = new Options();
- conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.BUCKET, 1);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.LOOKUP);
+ FileStoreTable table = createFileStoreTable(conf);
+
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ int numPartitions = 5;
+ int numRecordsPerPartition = 10000;
+
+ int commitIdentifier = 0;
+ for (int i = 0; i < numPartitions; i++) {
+ // Write a lot of records, then quickly call prepareCommit many
times, without waiting
+ // for compaction.
+ // It is very likely that the compaction is not finished.
+ for (int j = 0; j < numRecordsPerPartition; j++) {
+ write.write(GenericRow.of(i, j, (long) j));
+ }
+ for (int j = 0; j < 3; j++) {
+ commitIdentifier++;
+ // Even if there is no new record, if there is an ongoing
compaction, the
+ // corresponding writer should not be closed.
+ commit.commit(commitIdentifier, write.prepareCommit(false,
commitIdentifier));
+ }
+ }
+ commit.commit(commitIdentifier, write.prepareCommit(true,
commitIdentifier));
+
+ write.close();
+ commit.close();
+
+ Map<String, String> readOptions = new HashMap<>();
+ readOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+ table = table.copy(readOptions);
+ long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+
+ StreamTableScan scan = table.newStreamScan();
+ TableRead read = table.newRead();
+ List<InternalRow> actual = new ArrayList<>();
+ for (long i = 0; i <= latestSnapshotId; i++) {
+ RecordReader<InternalRow> reader =
read.createReader(scan.plan().splits());
+ reader.forEachRemaining(actual::add);
+ reader.close();
+ }
+
+ assertThat(actual).hasSize(numPartitions * numRecordsPerPartition);
+ }
+ private FileStoreTable createFileStoreTable(Options conf) throws Exception
{
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), tablePath),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 8b4f452ef..713bcc341 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -379,7 +379,9 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ String.format(
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
- + "'changelog-producer' = 'lookup'");
+ + "'changelog-producer' = 'lookup',"
+ + String.format(
+ "'changelog-producer.lookup-wait' = '%s'",
random.nextBoolean()));
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records
correctly
@@ -436,6 +438,8 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'lookup',"
+ + String.format(
+ "'changelog-producer.lookup-wait' = '%s',",
random.nextBoolean())
+ "'write-only' = 'true'");
// sleep for a random amount of time to check