This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a3c51eb2 [flink] Revert unstable test case 
FlinkTableSinkITCase.testWalModeWithDefaultMergeEngineAndAggregation (#2215)
1a3c51eb2 is described below

commit 1a3c51eb257d697efeb85a886a81f0b04a11ca7f
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 20 15:50:43 2025 +0800

    [flink] Revert unstable test case 
FlinkTableSinkITCase.testWalModeWithDefaultMergeEngineAndAggregation (#2215)
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 70 ----------------------
 1 file changed, 70 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c373dd998..c8aeaaae8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1384,74 +1384,4 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
             assertResultsIgnoreOrder(rowIter, expectedRows, true);
         }
     }
-
-    @Test
-    void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
-        String tableName = "wal_mode_pk_table";
-        // Create a table with WAL mode and default merge engine
-        tEnv.executeSql(
-                String.format(
-                        "create table %s ("
-                                + " id int not null,"
-                                + " category string,"
-                                + " amount bigint,"
-                                + " primary key (id) not enforced"
-                                + ") with ('table.changelog.image' = 'wal')",
-                        tableName));
-
-        // Insert initial data
-        tEnv.executeSql(
-                        String.format(
-                                "INSERT INTO %s VALUES "
-                                        + "(1, 'A', 100), "
-                                        + "(2, 'B', 200), "
-                                        + "(3, 'A', 150), "
-                                        + "(4, 'B', 250)",
-                                tableName))
-                .await();
-
-        // Use batch mode to update and delete records
-        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE 
id = 1").await();
-        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE 
id = 3").await();
-        tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 
4").await();
-
-        // Do aggregation on the table and verify ChangelogNormalize node is 
generated
-        String aggQuery =
-                String.format(
-                        "SELECT category, SUM(amount) as total_amount FROM %s 
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
-                        tableName);
-
-        // Explain the aggregation query to check for ChangelogNormalize
-        String aggPlan = tEnv.explainSql(aggQuery);
-        // ChangelogNormalize should be present to normalize the changelog for 
aggregation
-        // In Flink, when the source produces changelog with primary key 
semantics (I, UA, D),
-        // a ChangelogNormalize operator is inserted before aggregation
-        assertThat(aggPlan).contains("ChangelogNormalize");
-
-        // Execute the aggregation and verify the result
-        CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
-
-        // Expected aggregation results:
-        // Category A: 120 (id=1) + 180 (id=3) = 300
-        // Category B: 200 (id=2) = 200 (id=4 was deleted)
-        List<String> expectedAggResults =
-                Arrays.asList(
-                        "+I[A, 100]",
-                        "-U[A, 100]",
-                        "+U[A, 250]",
-                        "-U[A, 250]",
-                        "+U[A, 150]",
-                        "-U[A, 150]",
-                        "+U[A, 270]",
-                        "-U[A, 270]",
-                        "+U[A, 120]",
-                        "-U[A, 120]",
-                        "+U[A, 300]",
-                        "+I[B, 250]",
-                        "-D[B, 250]",
-                        "+I[B, 200]");
-
-        // Collect results with timeout
-        assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
-    }
 }

Reply via email to