This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new d903990e1ed [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test d903990e1ed is described below commit d903990e1ede423e92b6d6ec93876500519aab14 Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Tue Jan 30 15:56:02 2024 +0800 [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test This closes #24222 --- .../stream/table/TableAggregateITCase.scala | 28 +++++++++++----------- .../table/planner/runtime/utils/TestData.scala | 8 +++++++ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala index d90bdfa9232..cfc828545ab 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala @@ -21,6 +21,8 @@ import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, StreamingWithStateTestBase, TestData, TestingRetractSink} import org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableAggFunctions, StreamingWithStateTestBase, TestingRetractSink} import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverloadedDoubleMaxFunction import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode @@ -37,23 +39,20 @@ import org.junit.runners.Parameterized @RunWith(classOf[Parameterized]) class TableAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { - var myTable: Table = _ - @Before override def before(): Unit = { super.before() tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) - myTable = tEnv.fromValues( - DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.INT), - DataTypes.FIELD("name", DataTypes.STRING), - DataTypes.FIELD("price", DataTypes.INT)), - row(1, "Latte", 6: java.lang.Integer), - row(2, "Milk", 3: java.lang.Integer), - row(3, "Breve", 5: java.lang.Integer), - row(4, "Mocha", 8: java.lang.Integer), - row(5, "Tea", 4: java.lang.Integer) - ) + tEnv.executeSql(s""" + |CREATE TABLE myTable ( + | `id` INT, + | `name` STRING, + | `price` INT + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '${TestValuesTableFactory.registerData(TestData.tupleData4)}' + |) + |""".stripMargin) } @Test @@ -115,7 +114,8 @@ class TableAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTes def checkRank(func: String, expectedResult: List[String]): Unit = { val resultTable = - myTable + tEnv + .from("myTable") .flatAggregate(call(func, $("price")).as("top_price", "rank")) .select($("top_price"), $("rank")) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala index 2ac34b05e05..0c23c276d16 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala @@ -283,6 +283,14 @@ object TestData { lazy val data3: Seq[Row] = tupleData3.map(d => row(d.productIterator.toList: _*)) + lazy val tupleData4: Seq[Row] = Seq( + row(1, "Latte", 6), + row(2, "Milk", 3), + row(3, "Breve", 5), + row(4, "Mocha", 8), + row(5, "Tea", 4) + ) + val nullablesOfData3 = Array(true, true, true) val nullablesOfData4 = Array(true, true, true)