This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bcafd296de [flink] All lookup writers should store active buckets in 
state to make sure changelog can be produced (#5796)
bcafd296de is described below

commit bcafd296de732ba078f13b1c204ae4ff447a2ef0
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 26 16:39:30 2025 +0800

    [flink] All lookup writers should store active buckets in state to make 
sure changelog can be produced (#5796)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  5 --
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 30 +++++------
 ...ncLookupSinkWrite.java => LookupSinkWrite.java} | 10 ++--
 .../sink/MultiTablesStoreCompactOperator.java      |  4 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  5 +-
 .../paimon/flink/sink/WriterOperatorTest.java      | 59 +++++++++++++++++-----
 6 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f6f298259f..c269a969a9 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2726,11 +2726,6 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_WAIT);
     }
 
-    public boolean laziedLookup() {
-        return needLookup()
-                && (!options.get(LOOKUP_WAIT) || 
LookupCompactMode.GENTLE.equals(lookupCompact()));
-    }
-
     public LookupCompactMode lookupCompact() {
         return options.get(LOOKUP_COMPACT);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f518f0c8d1..3431f34cc5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -141,22 +141,22 @@ public abstract class FlinkSink<T> implements 
Serializable {
                             metricGroup);
                 };
             }
-        }
 
-        if (coreOptions.laziedLookup()) {
-            return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
-                assertNoSinkMaterializer.run();
-                return new AsyncLookupSinkWrite(
-                        table,
-                        commitUser,
-                        state,
-                        ioManager,
-                        ignorePreviousFiles,
-                        waitCompaction,
-                        isStreaming,
-                        memoryPool,
-                        metricGroup);
-            };
+            if (coreOptions.needLookup()) {
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                    assertNoSinkMaterializer.run();
+                    return new LookupSinkWrite(
+                            table,
+                            commitUser,
+                            state,
+                            ioManager,
+                            ignorePreviousFiles,
+                            waitCompaction,
+                            isStreaming,
+                            memoryPool,
+                            metricGroup);
+                };
+            }
         }
 
         return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
similarity index 93%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
index fcb4c89ebe..e6f683d4b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
@@ -32,17 +32,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-/**
- * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
- * org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
- */
-public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {
+/** {@link StoreSinkWrite} for tables with lookup changelog producer . */
+public class LookupSinkWrite extends StoreSinkWriteImpl {
 
+    // keep this state name for compatibility reasons
     private static final String ACTIVE_BUCKETS_STATE_NAME = 
"paimon_async_lookup_active_buckets";
 
     private final String tableName;
 
-    public AsyncLookupSinkWrite(
+    public LookupSinkWrite(
             FileStoreTable table,
             String commitUser,
             StoreSinkWriteState state,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index e1eb9ac7d6..0dad216ab0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -297,9 +297,9 @@ public class MultiTablesStoreCompactOperator
             }
         }
 
-        if (coreOptions.needLookup() && 
!coreOptions.prepareCommitWaitCompaction()) {
+        if (coreOptions.needLookup()) {
             return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                    new AsyncLookupSinkWrite(
+                    new LookupSinkWrite(
                             table,
                             commitUser,
                             state,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index c64bb4ef41..23bdb1628c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -66,7 +66,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 /** Tests for changelog table with primary keys. */
 public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
 
-    private static final int TIMEOUT = 480;
+    // This test is to discover unknown consistency bugs.
+    // When this test fails, look carefully into its core reason.
+    // Do not simply increase the timeout and pretend that everything is OK.
+    private static final int TIMEOUT = 180;
     private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class);
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index bc289a2987..757d25de1c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.flink.utils.TestingMetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -33,6 +35,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -54,12 +57,15 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -158,8 +164,9 @@ public class WriterOperatorTest {
         harness.close();
     }
 
-    @Test
-    public void testAsyncLookupWithFailure() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLookupWithFailure(boolean lookupWait) throws Exception {
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()},
@@ -173,9 +180,8 @@ public class WriterOperatorTest {
                 createFileStoreTable(
                         rowType, Arrays.asList("pt", "k"), 
Collections.singletonList("k"), options);
 
-        // we don't wait for compaction because this is async lookup test
         RowDataStoreWriteOperator.Factory operatorFactory =
-                getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
+                getLookupWriteOperatorFactory(fileStoreTable, lookupWait);
         OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
                 createHarness(operatorFactory);
 
@@ -195,19 +201,24 @@ public class WriterOperatorTest {
         harness.notifyOfCompletedCheckpoint(1);
         commitAll(harness, commit, 1);
 
-        // apply changes but does not wait for compaction
+        // apply changes but does not wait for compaction (lookupWait == false)
+        // or does not commit compact result
         harness.processElement(GenericRow.of(1, 10, 101), 11);
         harness.processElement(GenericRow.of(3, 30, 301), 13);
         harness.prepareSnapshotPreBarrier(2);
         OperatorSubtaskState state = harness.snapshot(2, 20);
         harness.notifyOfCompletedCheckpoint(2);
-        commitAll(harness, commit, 2);
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            commitAll(harness, commit, 2);
+        } else {
+            commitAppend(harness, commit, 2);
+        }
 
         // operator is closed due to failure
         harness.close();
 
         // re-create operator from state, this time wait for compaction to 
check result
-        operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, 
true);
+        operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
         harness = createHarness(operatorFactory);
         harness.setup(serializer);
         harness.initializeState(state);
@@ -262,7 +273,7 @@ public class WriterOperatorTest {
                         rowType, Arrays.asList("pt", "k"), 
Collections.singletonList("k"), options);
 
         RowDataStoreWriteOperator.Factory operatorFactory =
-                getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
+                getLookupWriteOperatorFactory(fileStoreTable, false);
         OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
                 createHarness(operatorFactory);
 
@@ -293,7 +304,7 @@ public class WriterOperatorTest {
         harness.close();
 
         // restore operator to trigger gentle lookup compaction
-        operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, 
true);
+        operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
         harness = createHarness(operatorFactory);
         harness.setup(serializer);
         harness.initializeState(state);
@@ -326,7 +337,7 @@ public class WriterOperatorTest {
         assertThat(actual).isEmpty();
 
         // restore operator to force trigger gentle lookup compaction
-        operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, 
true);
+        operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
         harness = createHarness(operatorFactory);
         harness.setup(serializer);
         harness.initializeState(state);
@@ -534,13 +545,13 @@ public class WriterOperatorTest {
                 commitUser);
     }
 
-    private RowDataStoreWriteOperator.Factory 
getAsyncLookupWriteOperatorFactory(
+    private RowDataStoreWriteOperator.Factory getLookupWriteOperatorFactory(
             FileStoreTable fileStoreTable, boolean waitCompaction) {
         return new RowDataStoreWriteOperator.Factory(
                 fileStoreTable,
                 null,
                 (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
-                        new AsyncLookupSinkWrite(
+                        new LookupSinkWrite(
                                 table,
                                 commitUser,
                                 state,
@@ -568,6 +579,30 @@ public class WriterOperatorTest {
         commit.commit(commitIdentifier, commitMessages);
     }
 
+    @SuppressWarnings("unchecked")
+    private void commitAppend(
+            OneInputStreamOperatorTestHarness<InternalRow, Committable> 
harness,
+            TableCommitImpl commit,
+            long commitIdentifier) {
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        while (!harness.getOutput().isEmpty()) {
+            Committable committable =
+                    ((StreamRecord<Committable>) 
harness.getOutput().poll()).getValue();
+            assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+            CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+            CommitMessageImpl newMessage =
+                    new CommitMessageImpl(
+                            message.partition(),
+                            message.bucket(),
+                            message.totalBuckets(),
+                            message.newFilesIncrement(),
+                            CompactIncrement.emptyIncrement(),
+                            new IndexIncrement(Collections.emptyList()));
+            commitMessages.add(newMessage);
+        }
+        commit.commit(commitIdentifier, commitMessages);
+    }
+
     private FileStoreTable createFileStoreTable(
             RowType rowType, List<String> primaryKeys, List<String> 
partitionKeys, Options conf)
             throws Exception {

Reply via email to