This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new d903990e1ed [FLINK-34115][table-planner] Fix TableAggregateITCase 
unstable test
d903990e1ed is described below

commit d903990e1ede423e92b6d6ec93876500519aab14
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Jan 30 15:56:02 2024 +0800

    [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test
    
    This closes #24222
---
 .../stream/table/TableAggregateITCase.scala        | 28 +++++++++++-----------
 .../table/planner/runtime/utils/TestData.scala     |  8 +++++++
 2 files changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
index d90bdfa9232..cfc828545ab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import 
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, 
StreamingWithStateTestBase, TestData, TestingRetractSink}
 import 
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, 
StreamingWithStateTestBase, TestingRetractSink}
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverloadedDoubleMaxFunction
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
@@ -37,23 +39,20 @@ import org.junit.runners.Parameterized
 @RunWith(classOf[Parameterized])
 class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
-  var myTable: Table = _
-
   @Before
   override def before(): Unit = {
     super.before()
     tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-    myTable = tEnv.fromValues(
-      DataTypes.ROW(
-        DataTypes.FIELD("id", DataTypes.INT),
-        DataTypes.FIELD("name", DataTypes.STRING),
-        DataTypes.FIELD("price", DataTypes.INT)),
-      row(1, "Latte", 6: java.lang.Integer),
-      row(2, "Milk", 3: java.lang.Integer),
-      row(3, "Breve", 5: java.lang.Integer),
-      row(4, "Mocha", 8: java.lang.Integer),
-      row(5, "Tea", 4: java.lang.Integer)
-    )
+    tEnv.executeSql(s"""
+                       |CREATE TABLE myTable (
+                       |  `id` INT,
+                       |  `name` STRING,
+                       |  `price` INT
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = 
'${TestValuesTableFactory.registerData(TestData.tupleData4)}'
+                       |)
+                       |""".stripMargin)
   }
 
   @Test
@@ -115,7 +114,8 @@ class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTes
 
   def checkRank(func: String, expectedResult: List[String]): Unit = {
     val resultTable =
-      myTable
+      tEnv
+        .from("myTable")
         .flatAggregate(call(func, $("price")).as("top_price", "rank"))
         .select($("top_price"), $("rank"))
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index 2ac34b05e05..0c23c276d16 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -283,6 +283,14 @@ object TestData {
 
   lazy val data3: Seq[Row] = tupleData3.map(d => row(d.productIterator.toList: 
_*))
 
+  lazy val tupleData4: Seq[Row] = Seq(
+    row(1, "Latte", 6),
+    row(2, "Milk", 3),
+    row(3, "Breve", 5),
+    row(4, "Mocha", 8),
+    row(5, "Tea", 4)
+  )
+
   val nullablesOfData3 = Array(true, true, true)
 
   val nullablesOfData4 = Array(true, true, true)

Reply via email to