This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3d81240d64 [test] Fix the unstable random tests in
PrimaryKeyFileStoreTableITCase (#4933)
3d81240d64 is described below
commit 3d81240d646d6637af86ea2554c192409edcc460
Author: tsreaper <[email protected]>
AuthorDate: Thu Jan 16 19:18:13 2025 +0800
[test] Fix the unstable random tests in PrimaryKeyFileStoreTableITCase
(#4933)
---
.../flink/PrimaryKeyFileStoreTableITCase.java | 39 ++++++++++++++++++----
1 file changed, 32 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 839a45a6e4..0b243558dd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -28,6 +28,8 @@ import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
@@ -37,7 +39,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@@ -787,7 +788,6 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}
- @Disabled // TODO: fix this unstable test
@Test
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws
Exception {
@@ -1088,6 +1088,29 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
tEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ // We use a large number of rows to mimic unbounded streams because
there is a known
+ // consistency issue in bounded streams.
+ //
+ // For bounded streams, if COMPACT snapshot fails to commit when the
stream ends (due to
+ // conflict or whatever reasons), we have no chance to modify the
compaction result, so the
+ // changelogs produced by compaction will not be committed.
+ //
+ // If it happens in production, users can run another job to compact
the table, or run
+ // another job to write more data into the table. These remaining
changelogs will be
+ // produced again.
+ int factor;
+ RuntimeExecutionMode mode =
+
tEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+ if (mode == RuntimeExecutionMode.BATCH) {
+ factor = 1;
+ } else if (mode == RuntimeExecutionMode.STREAMING) {
+ factor = 10;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unknown runtime execution mode " + mode.name());
+ }
+ int usefulNumRows = LIMIT + NUM_PARTS * NUM_KEYS;
tEnv.executeSql(
"CREATE TABLE `default_catalog`.`default_database`.`S`
("
+ " i INT"
@@ -1096,10 +1119,10 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ " 'fields.i.kind' = 'sequence',"
+ " 'fields.i.start' = '0',"
+ " 'fields.i.end' = '"
- + (LIMIT + NUM_PARTS * NUM_KEYS - 1)
+ + (usefulNumRows - 1) * factor
+ "',"
+ " 'number-of-rows' = '"
- + (LIMIT + NUM_PARTS * NUM_KEYS)
+ + usefulNumRows * factor
+ "',"
+ " 'rows-per-second' = '"
+ (LIMIT / 20 +
ThreadLocalRandom.current().nextInt(LIMIT / 20))
@@ -1129,7 +1152,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
String v2Sql = "CAST(i AS STRING) || '.str' AS v2";
tEnv.executeSql(
String.format(
- "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s,
%s, %s FROM `default_catalog`.`default_database`.`S`",
+ "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s,
%s, %s, i FROM `default_catalog`.`default_database`.`S`",
i, ptSql, kSql, v1Sql, v2Sql));
// run test SQL
@@ -1138,8 +1161,10 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
FailingFileIO.retryArtificialException(
() ->
tEnv.executeSql(
- "INSERT INTO T /*+
OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView"
- + idx));
+ "INSERT INTO T /*+
OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+ + idx
+ + " WHERE i < "
+ + usefulNumRows));
results.add(result);
}