This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c3cbbb61364 [hotfix][test] Use unique sink name in
`CascadedDeltaJoinITCase` and `SinkReuseITCase`
c3cbbb61364 is described below
commit c3cbbb61364ee0e4e257f018a19969bc2f7b51a3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Apr 27 06:08:30 2026 +0200
[hotfix][test] Use unique sink name in `CascadedDeltaJoinITCase` and
`SinkReuseITCase`
---
.../runtime/common/sql/SinkReuseITCase.java | 34 ++++++++-----
.../stream/sql/CascadedDeltaJoinITCase.scala | 56 +++++++++++-----------
2 files changed, 49 insertions(+), 41 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
index bd1c52e31a7..ef5bb991dee 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
@@ -86,23 +86,26 @@ public class SinkReuseITCase extends AbstractTestBase {
TestValuesTableFactory.registerData(
Arrays.asList(Row.of(1, 3.1d, "Jack"), Row.of(2, 3.2d,
"Rose")));
- createSourceTable("source1", getSourceOptions(dataId1));
- createSourceTable("source2", getSourceOptions(dataId2));
- createSourceTable("source3", getSourceOptions(dataId3));
+ createSourceTable("sinkReuseItCaseSource1", getSourceOptions(dataId1));
+ createSourceTable("sinkReuseItCaseSource2", getSourceOptions(dataId2));
+ createSourceTable("sinkReuseItCaseSource3", getSourceOptions(dataId3));
- createSinkTable("sink1", getSinkOptions());
- createSinkTable("sink2", getSinkOptions());
+ createSinkTable("sinkReuseItCaseSink1", getSinkOptions());
+ createSinkTable("sinkReuseItCaseSink2", getSinkOptions());
}
@TestTemplate
public void testSinkMergeFromSameSource() throws Exception {
setup(isBatch);
StatementSet statementSet = tEnv.createStatementSet();
- statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
- statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+ statementSet.addInsertSql(
+ "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM
sinkReuseItCaseSource1");
+ statementSet.addInsertSql(
+ "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM
sinkReuseItCaseSource1");
statementSet.execute().await();
- List<String> sink1Result =
TestValuesTableFactory.getResultsAsStrings("sink1");
+ List<String> sink1Result =
+
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink1");
List<String> sink1Expected =
Arrays.asList(
"+I[1, 1.1, Tom]",
@@ -116,13 +119,18 @@ public class SinkReuseITCase extends AbstractTestBase {
public void testMergeSink() throws Exception {
setup(isBatch);
StatementSet statementSet = tEnv.createStatementSet();
- statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
- statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source2");
- statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source3");
+ statementSet.addInsertSql(
+ "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM
sinkReuseItCaseSource1");
+ statementSet.addInsertSql(
+ "INSERT INTO sinkReuseItCaseSink1 SELECT * FROM
sinkReuseItCaseSource2");
+ statementSet.addInsertSql(
+ "INSERT INTO sinkReuseItCaseSink2 SELECT * FROM
sinkReuseItCaseSource3");
statementSet.execute().await();
- List<String> sink1Result =
TestValuesTableFactory.getResultsAsStrings("sink1");
- List<String> sink2Result =
TestValuesTableFactory.getResultsAsStrings("sink2");
+ List<String> sink1Result =
+
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink1");
+ List<String> sink2Result =
+
TestValuesTableFactory.getResultsAsStrings("sinkReuseItCaseSink2");
List<String> sink1Expected =
Arrays.asList(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
index db680dd1aab..9ee38c7d59a 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CascadedDeltaJoinITCase.scala
@@ -48,7 +48,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
AsyncTestValueLookupFunction.invokeCount.set(0)
tEnv.executeSql(s"""
- |create table sink1(
+ |create table cascadeDeltaJoinSink1(
| a1 int,
| c0 double,
| c2 string,
@@ -63,7 +63,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
|""".stripMargin)
tEnv.executeSql(s"""
- |create table sink2(
+ |create table cascadeDeltaJoinSink2(
| a1 int,
| c0 double,
| d1 int,
@@ -93,7 +93,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val sql =
"""
- |insert into sink1
+ |insert into cascadeDeltaJoinSink1
|select a1, c0, c2, a2, b2
| from A
|join B
@@ -106,7 +106,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[2, 2.0, c-2-2, a-2, b-2-2]", "+I[2, 3.0, c-3, a-2,
b-2-2]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -132,7 +132,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val sql =
"""
- |insert into sink1
+ |insert into cascadeDeltaJoinSink1
|select a1, c0, c2, a2, b2
| from A
|join B
@@ -145,7 +145,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[2, 2.0, c-2-2, a-2, b-2-2]", "+I[2, 3.0, c-3, a-2,
b-2-2]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -174,7 +174,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
prepareFourTables()
val sql =
"""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from A
|join B
@@ -190,7 +190,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2,
a-1-2, b-1-2]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -220,7 +220,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
prepareFourTables()
val sql =
"""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from A
|join B
@@ -236,7 +236,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2,
a-1-2, b-1-2]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -266,7 +266,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
prepareFourTables()
val sql =
"""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from A
|join B
@@ -282,7 +282,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[1, 1.0, 1, c-1, d-2, a-1-2, b-1-2]", "+I[1, 3.0, 1, c-3, d-2,
a-1-2, b-1-2]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -317,7 +317,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql("""
- |insert into sink1
+ |insert into cascadeDeltaJoinSink1
|select a1, c0, c2, a2, b2
| from C
|join myv
@@ -327,7 +327,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[3, 32.0, c-3-2, a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -361,7 +361,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql("""
- |insert into sink1
+ |insert into cascadeDeltaJoinSink1
|select a1, c0, c2, a2, b2
| from C
|join myv
@@ -371,7 +371,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[3, 32.0, c-3-2, a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -415,7 +415,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql("""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from D
|join dt2
@@ -426,7 +426,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3,
a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -471,7 +471,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql("""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from D
|join dt2
@@ -482,7 +482,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3,
a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -527,7 +527,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql("""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from D
|join dt2
@@ -538,7 +538,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected =
List("+I[3, 32.0, 3, c-3-2, d-3, a-3, b-3]", "+I[3, 33.0, 3, c-3-3, d-3,
a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -582,7 +582,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql(
"""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from dt1
|join dt2
@@ -593,7 +593,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[3, 33.0, 3, c-3-3, d-3, a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -637,7 +637,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
tEnv
.executeSql(
"""
- |insert into sink2
+ |insert into cascadeDeltaJoinSink2
|select a1, c0, d1, c2, d2, a2, b2
| from dt1
|join dt2
@@ -648,7 +648,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[3, 33.0, 3, c-3-3, d-3, a-3, b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink2")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink2")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |
@@ -686,7 +686,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val sql =
"""
- |insert into sink1
+ |insert into cascadeDeltaJoinSink1
|select a1, c0, c2, new_a2, ab2
| from dt1
|join C
@@ -696,7 +696,7 @@ class CascadedDeltaJoinITCase(enableCache: Boolean) extends
DeltaJoinITCaseBase(
val expected = List("+I[2, 2.0, c-2-2, a-2, a-2~b-2-2]", "+I[3, 32.0,
c-3-2, a-3, a-3~b-3]")
- val result = TestValuesTableFactory.getResultsAsStrings("sink1")
+ val result =
TestValuesTableFactory.getResultsAsStrings("cascadeDeltaJoinSink1")
assertThat(result.sorted).isEqualTo(expected.sorted)
// | DT | Lookup Count Without Cache | Lookup Count With Cache |