This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 295a6e37d1 [test] Fix incorrect test in StoreCompactOperatorTest 
(#5453)
295a6e37d1 is described below

commit 295a6e37d192a670c2f61fbdf50ba378bf7f64b6
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 11 18:43:13 2025 +0800

    [test] Fix incorrect test in StoreCompactOperatorTest (#5453)
---
 .../paimon/flink/sink/StoreCompactOperator.java       |  8 +++++++-
 .../paimon/flink/sink/StoreCompactOperatorTest.java   | 19 +++++++++++++++----
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 1870a0493c..99ad6c0c00 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.io.DataFileMeta;
@@ -63,7 +64,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     private transient DataFileMetaSerializer dataFileMetaSerializer;
     private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
 
-    private StoreCompactOperator(
+    public StoreCompactOperator(
             StreamOperatorParameters<Committable> parameters,
             FileStoreTable table,
             StoreSinkWrite.Provider storeSinkWriteProvider,
@@ -168,6 +169,11 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         write.close();
     }
 
+    @VisibleForTesting
+    public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
+        return waitToCompact;
+    }
+
     /** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */
     public static class Factory extends PrepareCommitOperator.Factory<RowData, 
Committable> {
         private final FileStoreTable table;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 3740033e02..bcd493e0d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -26,18 +26,21 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link StoreCompactOperator}. */
 public class StoreCompactOperatorTest extends TableTestBase {
 
@@ -70,15 +73,23 @@ public class StoreCompactOperatorTest extends TableTestBase 
{
         harness.processElement(new StreamRecord<>(data(1)));
         harness.processElement(new StreamRecord<>(data(2)));
 
-        ((StoreCompactOperator) 
harness.getOneInputOperator()).prepareCommit(true, 1);
-        
Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
+        StoreCompactOperator operator = (StoreCompactOperator) 
harness.getOperator();
+        assertThat(operator.compactionWaitingSet())
+                .containsExactlyInAnyOrder(
+                        Pair.of(BinaryRow.EMPTY_ROW, 0),
+                        Pair.of(BinaryRow.EMPTY_ROW, 1),
+                        Pair.of(BinaryRow.EMPTY_ROW, 2));
+        assertThat(compactRememberStoreWrite.compactTime).isEqualTo(0);
+        operator.prepareCommit(true, 1);
+        assertThat(operator.compactionWaitingSet()).isEmpty();
+        assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
     }
 
     private RowData data(int bucket) {
         GenericRow genericRow =
                 GenericRow.of(
                         0L,
-                        BinaryRow.EMPTY_ROW.toBytes(),
+                        
SerializationUtils.serializeBinaryRow(BinaryRow.EMPTY_ROW),
                         bucket,
                         new byte[] {0x00, 0x00, 0x00, 0x00});
         return new FlinkRowData(genericRow);

Reply via email to