This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3cbbb61364 [hotfix][test] Use unique sink name in 
`CascadedDeltaJoinITCase` and `SinkReuseITCase`
c3cbbb61364 is described below

commit c3cbbb61364ee0e4e257f018a19969bc2f7b51a3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Apr 27 06:08:30 2026 +0200

    [hotfix][test] Use unique sink name in `CascadedDeltaJoinITCase` and 
`SinkReuseITCase`
---
 .../runtime/common/sql/SinkReuseITCase.java        | 34 ++++++++-----
 .../stream/sql/CascadedDeltaJoinITCase.scala       | 56 +++++++++++-----------
 2 files changed, 49 insertions(+), 41 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
index bd1c52e31a7..ef5bb991dee 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
@@ -86,23 +86,26 @@ public class SinkReuseITCase extends AbstractTestBase {
                 TestValuesTableFactory.registerData(
                         Arrays.asList(Row.of(1, 3.1d, "Jack"), Row.of(2, 3.2d, 
"Rose")));
 
-        createSourceTable("source1", getSourceOptions(dataId1));
-        createSourceTable("source2", getSourceOptions(dataId2));
-        createSourceTable("source3", getSourceOptions(dataId3));
+        createSourceTable("sinkReuseItCaseSource1", getSourceOptions(dataId1));
+        createSourceTable("sinkReuseItCaseSource2", getSourceOptions(dataId2));
+        createSourceTable("sinkReuseItCaseSource3", getSourceOptions(dataId3));
 
-        createSinkTable("sink1", getSinkOptions());
-        createSinkTable("sink2", getSinkOptions());
+        createSinkTable("sinkReuseItCaseSink1", getSinkOptions());
+        createSinkTable("sinkReuseItCaseSink2", getSinkOptions());
     }
 
     @TestTemplate
     public void testSinkMergeFromSameSource() throws Exception {
         setup(isBatch);
         StatementSet statementSet = tEnv.createStatementSet();
-        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
-        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+        statementSet.addInsertSql(
+                "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM 
sinkReuseItCaseSource1");
+        statementSet.addInsertSql(
+                "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM 
sinkReuseItCaseSource1");
         statementSet.execute().await();
 
-        List<String> sink1Result = 
TestValuesTableFactory.getResultsAsStrings("sink1");
+        List<String> sink1Result =
+                
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink1");
         List<String> sink1Expected =
                 Arrays.asList(
                         "+I[1, 1.1, Tom]",
@@ -116,13 +119,18 @@ public class SinkReuseITCase extends AbstractTestBase {
     public void testMergeSink() throws Exception {
         setup(isBatch);
         StatementSet statementSet = tEnv.createStatementSet();
-        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
-        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source2");
-        statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source3");
+        statementSet.addInsertSql(
+                "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM 
sinkReuseItCaseSource1");
+        statementSet.addInsertSql(
+                "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM 
sinkReuseItCaseSource2");
+        statementSet.addInsertSql(
+                "INSERT INTO sinkReuseItCaseSink2 SELECT * FROM 
sinkReuseItCaseSource3");
         statementSet.execute().await();
 
-        List<String> sink1Result = 
TestValuesTableFactory.getResultsAsStrings("sink1");
-        List<String> sink2Result = 
TestValuesTableFactory.getResultsAsStrings("sink2");
+        List<String> sink1Result =
+                
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink1");
+        List<String> sink2Result =
+                
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink2");
 
         List<String> sink1Expected =
                 Arrays.asList(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
index db680dd1aab..9ee38c7d59a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
@@ -48,7 +48,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     AsyncTestValueLookupFunction.invokeCount.set(0)
 
     tEnv.executeSql(s"""
-                       |create table sink1(
+                       |create table cascadeDeltaJoinSink1(
                        |  a1 int,
                        |  c0 double,
                        |  c2 string,
@@ -63,7 +63,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
                        |""".stripMargin)
 
     tEnv.executeSql(s"""
-                       |create table sink2(
+                       |create table cascadeDeltaJoinSink2(
                        |  a1 int,
                        |  c0 double,
                        |  d1 int,
@@ -93,7 +93,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val sql =
       """
-        |insert into sink1
+        |insert into cascadeDeltaJoinSink1
         |select a1, c0, c2, a2, b2
         | from A
         |join B
@@ -106,7 +106,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[2, 2.0, c-2-2, a-2, b-2-2]", "+I[2, 3.0, c-3, a-2, 
b-2-2]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -132,7 +132,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val sql =
       """
-        |insert into sink1
+        |insert into cascadeDeltaJoinSink1
         |select a1, c0, c2, a2, b2
         | from A
         |join B
@@ -145,7 +145,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[2, 2.0, c-2-2, a-2, b-2-2]", "+I[2, 3.0, c-3, a-2, 
b-2-2]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -174,7 +174,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     prepareFourTables()
     val sql =
       """
-        |insert into sink2
+        |insert into cascadeDeltaJoinSink2
         |select a1, c0, d1, c2, d2, a2, b2
         | from A
         |join B
@@ -190,7 +190,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2, 
a-1-2, b-1-2]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -220,7 +220,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     prepareFourTables()
     val sql =
       """
-        |insert into sink2
+        |insert into cascadeDeltaJoinSink2
         |select a1, c0, d1, c2, d2, a2, b2
         | from A
         |join B
@@ -236,7 +236,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2, 
a-1-2, b-1-2]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -266,7 +266,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     prepareFourTables()
     val sql =
       """
-        |insert into sink2
+        |insert into cascadeDeltaJoinSink2
         |select a1, c0, d1, c2, d2, a2, b2
         | from A
         |join B
@@ -282,7 +282,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2, 
a-1-2, b-1-2]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -317,7 +317,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     tEnv
       .executeSql("""
-                    |insert into sink1
+                    |insert into cascadeDeltaJoinSink1
                     |select a1, c0, c2, a2, b2
                     | from C
                     |join myv
@@ -327,7 +327,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[3, 32.0, c-3-2, a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -361,7 +361,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     tEnv
       .executeSql("""
-                    |insert into sink1
+                    |insert into cascadeDeltaJoinSink1
                     |select a1, c0, c2, a2, b2
                     | from C
                     |join myv
@@ -371,7 +371,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[3, 32.0, c-3-2, a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -415,7 +415,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     tEnv
       .executeSql("""
-                    |insert into sink2
+                    |insert into cascadeDeltaJoinSink2
                     |select a1, c0, d1, c2, d2, a2, b2
                     | from D
                     |join dt2
@@ -426,7 +426,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3, 
a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -471,7 +471,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     tEnv
       .executeSql("""
-                    |insert into sink2
+                    |insert into cascadeDeltaJoinSink2
                     |select a1, c0, d1, c2, d2, a2, b2
                     | from D
                     |join dt2
@@ -482,7 +482,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3, 
a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -527,7 +527,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     tEnv
       .executeSql("""
-                    |insert into sink2
+                    |insert into cascadeDeltaJoinSink2
                     |select a1, c0, d1, c2, d2, a2, b2
                     | from D
                     |join dt2
@@ -538,7 +538,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     val expected =
       List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3, 
a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -582,7 +582,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     tEnv
       .executeSql(
         """
-          |insert into sink2
+          |insert into cascadeDeltaJoinSink2
           |select a1, c0, d1, c2, d2, a2, b2
           | from dt1
           |join dt2
@@ -593,7 +593,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[3, 33.0, 3, c-3-3, d-3, a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -637,7 +637,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
     tEnv
       .executeSql(
         """
-          |insert into sink2
+          |insert into cascadeDeltaJoinSink2
           |select a1, c0, d1, c2, d2, a2, b2
           | from dt1
           |join dt2
@@ -648,7 +648,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[3, 33.0, 3, c-3-3, d-3, a-3, b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |
@@ -686,7 +686,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val sql =
       """
-        |insert into sink1
+        |insert into cascadeDeltaJoinSink1
         |select a1, c0, c2, new_a2, ab2
         | from dt1
         |join C
@@ -696,7 +696,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends 
DeltaJoinITCaseBase(
 
     val expected = List("+I[2, 2.0, c-2-2, a-2, a-2~b-2-2]", "+I[3, 32.0, 
c-3-2, a-3, a-3~b-3]")
 
-    val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+    val result = 
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
     assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // | DT           | Lookup Count Without Cache  | Lookup Count With Cache |

Reply via email to