This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d81240d64 [test] Fix the unstable random tests in 
PrimaryKeyFileStoreTableITCase (#4933)
3d81240d64 is described below

commit 3d81240d646d6637af86ea2554c192409edcc460
Author: tsreaper <[email protected]>
AuthorDate: Thu Jan 16 19:18:13 2025 +0800

    [test] Fix the unstable random tests in PrimaryKeyFileStoreTableITCase 
(#4933)
---
 .../flink/PrimaryKeyFileStoreTableITCase.java      | 39 ++++++++++++++++++----
 1 file changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 839a45a6e4..0b243558dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -28,6 +28,8 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
@@ -37,7 +39,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -787,7 +788,6 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         testFullCompactionChangelogProducerRandom(bEnv, 1, false);
     }
 
-    @Disabled // TODO: fix this unstable test
     @Test
     @Timeout(TIMEOUT)
     public void testFullCompactionChangelogProducerStreamingRandom() throws 
Exception {
@@ -1088,6 +1088,29 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         tEnv.getConfig()
                 .getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        // We use a large number of rows to mimic unbounded streams because 
there is a known
+        // consistency issue in bounded streams.
+        //
+        // For bounded streams, if COMPACT snapshot fails to commit when the 
stream ends (due to
+        // conflict or whatever reasons), we have no chance to modify the 
compaction result, so the
+        // changelogs produced by compaction will not be committed.
+        //
+        // If it happens in production, users can run another job to compact 
the table, or run
+        // another job to write more data into the table. These remaining 
changelogs will be
+        // produced again.
+        int factor;
+        RuntimeExecutionMode mode =
+                
tEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+        if (mode == RuntimeExecutionMode.BATCH) {
+            factor = 1;
+        } else if (mode == RuntimeExecutionMode.STREAMING) {
+            factor = 10;
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unknown runtime execution mode " + mode.name());
+        }
+        int usefulNumRows = LIMIT + NUM_PARTS * NUM_KEYS;
         tEnv.executeSql(
                         "CREATE TABLE `default_catalog`.`default_database`.`S` 
("
                                 + "  i INT"
@@ -1096,10 +1119,10 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 + "  'fields.i.kind' = 'sequence',"
                                 + "  'fields.i.start' = '0',"
                                 + "  'fields.i.end' = '"
-                                + (LIMIT + NUM_PARTS * NUM_KEYS - 1)
+                                + (usefulNumRows - 1) * factor
                                 + "',"
                                 + "  'number-of-rows' = '"
-                                + (LIMIT + NUM_PARTS * NUM_KEYS)
+                                + usefulNumRows * factor
                                 + "',"
                                 + "  'rows-per-second' = '"
                                 + (LIMIT / 20 + 
ThreadLocalRandom.current().nextInt(LIMIT / 20))
@@ -1129,7 +1152,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
             String v2Sql = "CAST(i AS STRING) || '.str' AS v2";
             tEnv.executeSql(
                     String.format(
-                            "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, 
%s, %s FROM `default_catalog`.`default_database`.`S`",
+                            "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, 
%s, %s, i FROM `default_catalog`.`default_database`.`S`",
                             i, ptSql, kSql, v1Sql, v2Sql));
 
             // run test SQL
@@ -1138,8 +1161,10 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                     FailingFileIO.retryArtificialException(
                             () ->
                                     tEnv.executeSql(
-                                            "INSERT INTO T /*+ 
OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView"
-                                                    + idx));
+                                            "INSERT INTO T /*+ 
OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+                                                    + idx
+                                                    + " WHERE i < "
+                                                    + usefulNumRows));
             results.add(result);
         }
 

Reply via email to