This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 295a6e37d1 [test] Fix incorrect test in StoreCompactOperatorTest
(#5453)
295a6e37d1 is described below
commit 295a6e37d192a670c2f61fbdf50ba378bf7f64b6
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 11 18:43:13 2025 +0800
[test] Fix incorrect test in StoreCompactOperatorTest (#5453)
---
.../paimon/flink/sink/StoreCompactOperator.java | 8 +++++++-
.../paimon/flink/sink/StoreCompactOperatorTest.java | 19 +++++++++++++++----
2 files changed, 22 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 1870a0493c..99ad6c0c00 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
@@ -63,7 +64,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private transient DataFileMetaSerializer dataFileMetaSerializer;
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
- private StoreCompactOperator(
+ public StoreCompactOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
@@ -168,6 +169,11 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
write.close();
}
+ @VisibleForTesting
+ public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
+ return waitToCompact;
+ }
+
/** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */
public static class Factory extends PrepareCommitOperator.Factory<RowData,
Committable> {
private final FileStoreTable table;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 3740033e02..bcd493e0d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -26,18 +26,21 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Test for {@link StoreCompactOperator}. */
public class StoreCompactOperatorTest extends TableTestBase {
@@ -70,15 +73,23 @@ public class StoreCompactOperatorTest extends TableTestBase
{
harness.processElement(new StreamRecord<>(data(1)));
harness.processElement(new StreamRecord<>(data(2)));
- ((StoreCompactOperator)
harness.getOneInputOperator()).prepareCommit(true, 1);
-
Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
+ StoreCompactOperator operator = (StoreCompactOperator)
harness.getOperator();
+ assertThat(operator.compactionWaitingSet())
+ .containsExactlyInAnyOrder(
+ Pair.of(BinaryRow.EMPTY_ROW, 0),
+ Pair.of(BinaryRow.EMPTY_ROW, 1),
+ Pair.of(BinaryRow.EMPTY_ROW, 2));
+ assertThat(compactRememberStoreWrite.compactTime).isEqualTo(0);
+ operator.prepareCommit(true, 1);
+ assertThat(operator.compactionWaitingSet()).isEmpty();
+ assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
}
private RowData data(int bucket) {
GenericRow genericRow =
GenericRow.of(
0L,
- BinaryRow.EMPTY_ROW.toBytes(),
+
SerializationUtils.serializeBinaryRow(BinaryRow.EMPTY_ROW),
bucket,
new byte[] {0x00, 0x00, 0x00, 0x00});
return new FlinkRowData(genericRow);