This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bcafd296de [flink] All lookup writers should store active buckets in
state to make sure changelog can be produced (#5796)
bcafd296de is described below
commit bcafd296de732ba078f13b1c204ae4ff447a2ef0
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 26 16:39:30 2025 +0800
[flink] All lookup writers should store active buckets in state to make
sure changelog can be produced (#5796)
---
.../main/java/org/apache/paimon/CoreOptions.java | 5 --
.../org/apache/paimon/flink/sink/FlinkSink.java | 30 +++++------
...ncLookupSinkWrite.java => LookupSinkWrite.java} | 10 ++--
.../sink/MultiTablesStoreCompactOperator.java | 4 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 5 +-
.../paimon/flink/sink/WriterOperatorTest.java | 59 +++++++++++++++++-----
6 files changed, 72 insertions(+), 41 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f6f298259f..c269a969a9 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2726,11 +2726,6 @@ public class CoreOptions implements Serializable {
return options.get(LOOKUP_WAIT);
}
- public boolean laziedLookup() {
- return needLookup()
- && (!options.get(LOOKUP_WAIT) ||
LookupCompactMode.GENTLE.equals(lookupCompact()));
- }
-
public LookupCompactMode lookupCompact() {
return options.get(LOOKUP_COMPACT);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f518f0c8d1..3431f34cc5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -141,22 +141,22 @@ public abstract class FlinkSink<T> implements
Serializable {
metricGroup);
};
}
- }
- if (coreOptions.laziedLookup()) {
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
- assertNoSinkMaterializer.run();
- return new AsyncLookupSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- };
+ if (coreOptions.needLookup()) {
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new LookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
}
return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
similarity index 93%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
index fcb4c89ebe..e6f683d4b1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
@@ -32,17 +32,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-/**
- * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
- * org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
- */
-public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {
+/** {@link StoreSinkWrite} for tables with lookup changelog producer . */
+public class LookupSinkWrite extends StoreSinkWriteImpl {
+ // keep this state name for compatibility reasons
private static final String ACTIVE_BUCKETS_STATE_NAME =
"paimon_async_lookup_active_buckets";
private final String tableName;
- public AsyncLookupSinkWrite(
+ public LookupSinkWrite(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index e1eb9ac7d6..0dad216ab0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -297,9 +297,9 @@ public class MultiTablesStoreCompactOperator
}
}
- if (coreOptions.needLookup() &&
!coreOptions.prepareCommitWaitCompaction()) {
+ if (coreOptions.needLookup()) {
return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new AsyncLookupSinkWrite(
+ new LookupSinkWrite(
table,
commitUser,
state,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index c64bb4ef41..23bdb1628c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -66,7 +66,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/** Tests for changelog table with primary keys. */
public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
- private static final int TIMEOUT = 480;
+ // This test is to discover unknown consistency bugs.
+ // When this test fails, look carefully into its core reason.
+ // Do not simply increase the timeout and pretend that everything is OK.
+ private static final int TIMEOUT = 180;
private static final Logger LOG =
LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class);
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index bc289a2987..757d25de1c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -33,6 +35,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -54,12 +57,15 @@ import
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -158,8 +164,9 @@ public class WriterOperatorTest {
harness.close();
}
- @Test
- public void testAsyncLookupWithFailure() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testLookupWithFailure(boolean lookupWait) throws Exception {
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()},
@@ -173,9 +180,8 @@ public class WriterOperatorTest {
createFileStoreTable(
rowType, Arrays.asList("pt", "k"),
Collections.singletonList("k"), options);
- // we don't wait for compaction because this is async lookup test
RowDataStoreWriteOperator.Factory operatorFactory =
- getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
+ getLookupWriteOperatorFactory(fileStoreTable, lookupWait);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createHarness(operatorFactory);
@@ -195,19 +201,24 @@ public class WriterOperatorTest {
harness.notifyOfCompletedCheckpoint(1);
commitAll(harness, commit, 1);
- // apply changes but does not wait for compaction
+ // apply changes but does not wait for compaction (lookupWait == false)
+ // or does not commit compact result
harness.processElement(GenericRow.of(1, 10, 101), 11);
harness.processElement(GenericRow.of(3, 30, 301), 13);
harness.prepareSnapshotPreBarrier(2);
OperatorSubtaskState state = harness.snapshot(2, 20);
harness.notifyOfCompletedCheckpoint(2);
- commitAll(harness, commit, 2);
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ commitAll(harness, commit, 2);
+ } else {
+ commitAppend(harness, commit, 2);
+ }
// operator is closed due to failure
harness.close();
// re-create operator from state, this time wait for compaction to
check result
- operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable,
true);
+ operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
harness = createHarness(operatorFactory);
harness.setup(serializer);
harness.initializeState(state);
@@ -262,7 +273,7 @@ public class WriterOperatorTest {
rowType, Arrays.asList("pt", "k"),
Collections.singletonList("k"), options);
RowDataStoreWriteOperator.Factory operatorFactory =
- getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
+ getLookupWriteOperatorFactory(fileStoreTable, false);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createHarness(operatorFactory);
@@ -293,7 +304,7 @@ public class WriterOperatorTest {
harness.close();
// restore operator to trigger gentle lookup compaction
- operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable,
true);
+ operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
harness = createHarness(operatorFactory);
harness.setup(serializer);
harness.initializeState(state);
@@ -326,7 +337,7 @@ public class WriterOperatorTest {
assertThat(actual).isEmpty();
// restore operator to force trigger gentle lookup compaction
- operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable,
true);
+ operatorFactory = getLookupWriteOperatorFactory(fileStoreTable, true);
harness = createHarness(operatorFactory);
harness.setup(serializer);
harness.initializeState(state);
@@ -534,13 +545,13 @@ public class WriterOperatorTest {
commitUser);
}
- private RowDataStoreWriteOperator.Factory
getAsyncLookupWriteOperatorFactory(
+ private RowDataStoreWriteOperator.Factory getLookupWriteOperatorFactory(
FileStoreTable fileStoreTable, boolean waitCompaction) {
return new RowDataStoreWriteOperator.Factory(
fileStoreTable,
null,
(table, commitUser, state, ioManager, memoryPool, metricGroup)
->
- new AsyncLookupSinkWrite(
+ new LookupSinkWrite(
table,
commitUser,
state,
@@ -568,6 +579,30 @@ public class WriterOperatorTest {
commit.commit(commitIdentifier, commitMessages);
}
+ @SuppressWarnings("unchecked")
+ private void commitAppend(
+ OneInputStreamOperatorTestHarness<InternalRow, Committable>
harness,
+ TableCommitImpl commit,
+ long commitIdentifier) {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Committable committable =
+ ((StreamRecord<Committable>)
harness.getOutput().poll()).getValue();
+ assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ message.totalBuckets(),
+ message.newFilesIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(Collections.emptyList()));
+ commitMessages.add(newMessage);
+ }
+ commit.commit(commitIdentifier, commitMessages);
+ }
+
private FileStoreTable createFileStoreTable(
RowType rowType, List<String> primaryKeys, List<String>
partitionKeys, Options conf)
throws Exception {