This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 876ad59bf2d [hotfix][tests] Replace removed in 2.13 `MutableList`
876ad59bf2d is described below

commit 876ad59bf2d8732dd98f3355e1a47a0a2c66c8a5
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri May 1 09:11:58 2026 +0200

    [hotfix][tests] Replace removed in 2.13 `MutableList`
---
 .../TraversableSerializerUpgradeTest.scala         |  24 ++---
 .../flink/table/api/TableEnvironmentITCase.scala   |   4 +-
 .../planner/expressions/NonDeterministicTest.scala |   2 +-
 .../runtime/batch/sql/TimestampITCase.scala        |   6 +-
 .../runtime/batch/table/AggregationITCase.scala    |   6 +-
 .../planner/runtime/batch/table/CalcITCase.scala   |   6 +-
 .../runtime/batch/table/CorrelateITCase.scala      |   2 +-
 .../runtime/batch/table/SetOperatorsITCase.scala   |   6 +-
 .../harness/GroupAggregateHarnessTest.scala        |   6 +-
 .../runtime/harness/OverAggregateHarnessTest.scala |  14 +--
 .../planner/runtime/harness/RankHarnessTest.scala  |   6 +-
 .../harness/TableAggregateHarnessTest.scala        |   2 +-
 .../runtime/stream/sql/AggregateITCase.scala       |  68 ++++++------
 .../runtime/stream/sql/AggregateRemoveITCase.scala |   2 +-
 .../runtime/stream/sql/CorrelateITCase.scala       |   2 +-
 .../runtime/stream/sql/DeduplicateITCase.scala     |   2 +-
 .../runtime/stream/sql/IntervalJoinITCase.scala    | 114 ++++++++++-----------
 .../planner/runtime/stream/sql/JoinITCase.scala    |  66 ++++++------
 .../runtime/stream/sql/MatchRecognizeITCase.scala  |  40 ++++----
 .../runtime/stream/sql/OverAggregateITCase.scala   |   6 +-
 .../runtime/stream/sql/SetOperatorsITCase.scala    |  12 +--
 .../planner/runtime/stream/sql/SortITCase.scala    |  14 +--
 .../runtime/stream/sql/SplitAggregateITCase.scala  |   2 +-
 .../stream/sql/StreamFileSystemITCaseBase.scala    |   2 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |   2 +-
 .../sql/TemporalTableFunctionJoinITCase.scala      |  18 ++--
 .../runtime/stream/sql/TimestampITCase.scala       |   4 +-
 .../runtime/stream/table/AggregateITCase.scala     |  40 ++++----
 .../planner/runtime/stream/table/CalcITCase.scala  |  46 ++++-----
 .../runtime/stream/table/CorrelateITCase.scala     |  22 ++--
 .../runtime/stream/table/OverAggregateITCase.scala |  12 +--
 .../runtime/stream/table/SetOperatorsITCase.scala  |   8 +-
 .../runtime/utils/CollectionBatchExecTable.scala   |  36 +++----
 .../runtime/utils/StreamingWithStateTestBase.scala |   2 +-
 .../table/planner/runtime/utils/TestData.scala     |  16 +--
 35 files changed, 310 insertions(+), 310 deletions(-)

diff --git 
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/typeutils/TraversableSerializerUpgradeTest.scala
 
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/typeutils/TraversableSerializerUpgradeTest.scala
index 02bed7a8cc8..2a89f5f7823 100644
--- 
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/typeutils/TraversableSerializerUpgradeTest.scala
+++ 
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/typeutils/TraversableSerializerUpgradeTest.scala
@@ -70,7 +70,7 @@ class TraversableSerializerUpgradeTest
         classOf[MapSerializerSetup],
         classOf[MapSerializerVerifier]))
     testSpecifications.add(
-      new TestSpecification[mutable.MutableList[Int], 
mutable.MutableList[Int]](
+      new TestSpecification[mutable.ListBuffer[Int], mutable.ListBuffer[Int]](
         "traversable-serializer-mutable-list",
         migrationVersion,
         classOf[MutableListSerializerSetup],
@@ -131,7 +131,7 @@ object TraversableSerializerUpgradeTest {
     val setTypeInfo = implicitly[TypeInformation[Set[Int]]]
     val bitsetTypeInfo = implicitly[TypeInformation[BitSet]]
     val mutableListTypeInfo =
-      implicitly[TypeInformation[mutable.MutableList[Int]]]
+      implicitly[TypeInformation[mutable.ListBuffer[Int]]]
     val seqTupleTypeInfo = implicitly[TypeInformation[Seq[(Int, String)]]]
     val seqPojoTypeInfo = implicitly[TypeInformation[Seq[Pojo]]]
   }
@@ -225,26 +225,26 @@ object TraversableSerializerUpgradeTest {
   }
 
   final class MutableListSerializerSetup
-    extends 
TypeSerializerUpgradeTestBase.PreUpgradeSetup[mutable.MutableList[Int]] {
-    override def createPriorSerializer: 
TypeSerializer[mutable.MutableList[Int]] =
+    extends 
TypeSerializerUpgradeTestBase.PreUpgradeSetup[mutable.ListBuffer[Int]] {
+    override def createPriorSerializer: 
TypeSerializer[mutable.ListBuffer[Int]] =
       new TypeSerializerSupplier(mutableListTypeInfo).get()
 
-    override def createTestData: mutable.MutableList[Int] = 
mutable.MutableList(1, 2, 3)
+    override def createTestData: mutable.ListBuffer[Int] = 
mutable.ListBuffer(1, 2, 3)
   }
 
   final class MutableListSerializerVerifier
-    extends 
TypeSerializerUpgradeTestBase.UpgradeVerifier[mutable.MutableList[Int]] {
-    override def createUpgradedSerializer: 
TypeSerializer[mutable.MutableList[Int]] =
+    extends 
TypeSerializerUpgradeTestBase.UpgradeVerifier[mutable.ListBuffer[Int]] {
+    override def createUpgradedSerializer: 
TypeSerializer[mutable.ListBuffer[Int]] =
       new TypeSerializerSupplier(mutableListTypeInfo).get()
 
-    override def testDataCondition: Condition[mutable.MutableList[Int]] =
-      new Condition[mutable.MutableList[Int]](
-        (l: mutable.MutableList[Int]) => Objects.equals(l, 
mutable.MutableList(1, 2, 3)),
+    override def testDataCondition: Condition[mutable.ListBuffer[Int]] =
+      new Condition[mutable.ListBuffer[Int]](
+        (l: mutable.ListBuffer[Int]) => Objects.equals(l, 
mutable.ListBuffer(1, 2, 3)),
         "")
 
     override def schemaCompatibilityCondition(version: FlinkVersion)
-        : 
Condition[TypeSerializerSchemaCompatibility[mutable.MutableList[Int]]] =
-      TypeSerializerConditions.isCompatibleAsIs[mutable.MutableList[Int]]()
+        : 
Condition[TypeSerializerSchemaCompatibility[mutable.ListBuffer[Int]]] =
+      TypeSerializerConditions.isCompatibleAsIs[mutable.ListBuffer[Int]]()
   }
 
   final class SeqSerializerSetup extends 
TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[Int]] {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index ce029217766..b6eb07d6590 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -714,7 +714,7 @@ class TableEnvironmentITCase(tableEnvName: String, 
isStreaming: Boolean) {
   }
 
   def getPersonData: List[(String, Int, Double, String)] = {
-    val data = new mutable.MutableList[(String, Int, Double, String)]
+    val data = new mutable.ListBuffer[(String, Int, Double, String)]
     data.+=(("Mike", 1, 12.3, "Smith"))
     data.+=(("Bob", 2, 45.6, "Taylor"))
     data.+=(("Sam", 3, 7.89, "Miller"))
@@ -747,7 +747,7 @@ class TableEnvironmentITCase(tableEnvName: String, 
isStreaming: Boolean) {
 
   private def getTableData(tEnv: TableEnvironment, tableName: String): 
List[String] = {
     val iterator = tEnv.executeSql(s"select * from $tableName").collect()
-    val result = new mutable.MutableList[String]
+    val result = new mutable.ListBuffer[String]
     try {
       while (iterator.hasNext) {
         result.+=(TestSinkUtil.rowToString(iterator.next()))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
index 80691a3e8dd..d8b4f7832c4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
@@ -102,7 +102,7 @@ class NonDeterministicTest(isStreaming: Boolean) extends 
ExpressionTestBase(isSt
         "LOCALTIME",
         "LOCALTIMESTAMP"))
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "1970-01-01",
       "08:00:01",
       "1970-01-01 08:00:01.123",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TimestampITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TimestampITCase.scala
index 2007134d886..0c738bb07cd 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TimestampITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TimestampITCase.scala
@@ -72,7 +72,7 @@ class TimestampITCase extends BatchTestBase {
       null
     )
 
-    val instantsOfDateTime = new mutable.MutableList[Instant]
+    val instantsOfDateTime = new mutable.ListBuffer[Instant]
     for (i <- datetimes.indices) {
       if (datetimes(i) == null) {
         instantsOfDateTime += null
@@ -83,7 +83,7 @@ class TimestampITCase extends BatchTestBase {
       }
     }
 
-    val instantsOfTimestamp = new mutable.MutableList[Instant]
+    val instantsOfTimestamp = new mutable.ListBuffer[Instant]
     for (i <- timestamps.indices) {
       if (timestamps(i) == null) {
         instantsOfTimestamp += null
@@ -94,7 +94,7 @@ class TimestampITCase extends BatchTestBase {
       }
     }
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
 
     for (i <- ints.indices) {
       data += row(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
index 88a26d49f1e..baeb2c1d498 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
@@ -243,14 +243,14 @@ class AggregationITCase extends BatchTestBase {
     val myAgg = new NonMergableCount
 
     val t1 = BatchTableEnvUtil
-      .fromCollection(tEnv, new mutable.MutableList[(Int, String)], "a, b")
+      .fromCollection(tEnv, new mutable.ListBuffer[(Int, String)], "a, b")
       .select('a.sum, 'a.count)
     val t2 = BatchTableEnvUtil
-      .fromCollection(tEnv, new mutable.MutableList[(Int, String)], "a, b")
+      .fromCollection(tEnv, new mutable.ListBuffer[(Int, String)], "a, b")
       .select('a.sum, myAgg('b), 'a.count)
     // test agg with empty parameter
     val t3 = BatchTableEnvUtil
-      .fromCollection(tEnv, new mutable.MutableList[(Int, String)], "a, b")
+      .fromCollection(tEnv, new mutable.ListBuffer[(Int, String)], "a, b")
       .select('a.sum, myAgg(), 'a.count)
 
     val expected1 = "null,0"
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
index a464fb6c156..e29e0a0b53a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
@@ -445,7 +445,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testRowType(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Jack#22"))
     data.+=((2, 2L, "John#19"))
     data.+=((3, 2L, "Anna#44"))
@@ -541,7 +541,7 @@ class CalcITCase extends BatchTestBase {
       "{3=Hello world}\n"
     TestBaseUtils.compareResultAsText(result3.asJava, expected3)
 
-    val data = new mutable.MutableList[(String, BigDecimal, String, 
BigDecimal)]
+    val data = new mutable.ListBuffer[(String, BigDecimal, String, BigDecimal)]
     data.+=(("AAA", BigDecimal.valueOf(123.45), "BBB", 
BigDecimal.valueOf(234.56)))
     data.+=(("CCC", BigDecimal.valueOf(345.67), "DDD", 
BigDecimal.valueOf(456.78)))
     data.+=(("EEE", BigDecimal.valueOf(567.89), "FFF", 
BigDecimal.valueOf(678.99)))
@@ -557,7 +557,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testValueConstructor(): Unit = {
-    val data = new mutable.MutableList[(String, Int, LocalDateTime)]
+    val data = new mutable.ListBuffer[(String, Int, LocalDateTime)]
     data.+=(("foo", 12, localDateTime("1984-07-12 14:34:24")))
     val t = BatchTableEnvUtil
       .fromCollection(tEnv, data, "a, b, c")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
index a97c34542fb..53ae92ac83c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
@@ -378,7 +378,7 @@ class CorrelateITCase extends BatchTestBase {
 
   private def testData: Table = {
 
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Jack#22"))
     data.+=((2, 2L, "John#19"))
     data.+=((3, 2L, "Anna#44"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/SetOperatorsITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/SetOperatorsITCase.scala
index 53682d6c772..c9d5476c4b7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/SetOperatorsITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/SetOperatorsITCase.scala
@@ -170,7 +170,7 @@ class SetOperatorsITCase extends BatchTestBase {
   @Test
   def testIntersect(): Unit = {
     val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv, "a, b, c")
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((2, 2L, "Hello"))
@@ -187,9 +187,9 @@ class SetOperatorsITCase extends BatchTestBase {
 
   @Test
   def testIntersectAll(): Unit = {
-    val data1 = new mutable.MutableList[Int]
+    val data1 = new mutable.ListBuffer[Int]
     data1 += (1, 1, 1, 2, 2)
-    val data2 = new mutable.MutableList[Int]
+    val data2 = new mutable.ListBuffer[Int]
     data2 += (1, 2, 2, 2, 3)
     val ds1 = BatchTableEnvUtil.fromCollection(tEnv, data1, "c")
     val ds2 = BatchTableEnvUtil.fromCollection(tEnv, data2, "c")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 932605af5f2..fb37dd71454 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -354,7 +354,7 @@ class GroupAggregateHarnessTest(
 
   private def createAggregation()
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
Array[LogicalType]) = {
-    val data = new mutable.MutableList[(String, String, Long)]
+    val data = new mutable.ListBuffer[(String, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'a, 'b, 'c)
@@ -385,7 +385,7 @@ class GroupAggregateHarnessTest(
       : (OneInputStreamOperatorTestHarness[RowData, RowData], 
KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
Array[LogicalType]) = {
     tEnv.getConfig.set(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
AggregatePhaseStrategy.TWO_PHASE)
 
-    val data = new mutable.MutableList[(String, String, Long)]
+    val data = new mutable.ListBuffer[(String, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'a, 'b, 'c)
@@ -411,7 +411,7 @@ class GroupAggregateHarnessTest(
 
   private def createAggregationWithDistinct()
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
Array[LogicalType]) = {
-    val data = new mutable.MutableList[(String, String, Long)]
+    val data = new mutable.ListBuffer[(String, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'a, 'b, 'c)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index f3268670140..696fef0efc0 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -123,7 +123,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
 
   private def createProcTimeBoundedRowsOver()
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
Array[LogicalType]) = {
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
@@ -158,7 +158,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testProcTimeBoundedRangeOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
@@ -264,7 +264,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testProcTimeUnboundedOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil
       .fromCollection(env, data)
       .toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
@@ -355,7 +355,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testRowTimeBoundedRangeOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 
'rowtime.rowtime, 'b, 'c)
     tEnv.createTemporaryView("T", t)
 
@@ -452,7 +452,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testRowTimeBoundedRowsOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 
'rowtime.rowtime, 'b, 'c)
     tEnv.createTemporaryView("T", t)
 
@@ -579,7 +579,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testRowTimeUnboundedRangeOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 
'rowtime.rowtime, 'b, 'c)
     tEnv.createTemporaryView("T", t)
 
@@ -701,7 +701,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   @TestTemplate
   def testRowTimeUnboundedRowsOver(): Unit = {
 
-    val data = new mutable.MutableList[(Long, String, Long)]
+    val data = new mutable.ListBuffer[(Long, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 
'rowtime.rowtime, 'b, 'c)
     tEnv.createTemporaryView("T", t)
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
index 4766668af92..734aa76a854 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
@@ -62,7 +62,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
 
   @TestTemplate
   def testRetractRankWithRowNumber(): Unit = {
-    val data = new mutable.MutableList[(String, String, Long)]
+    val data = new mutable.ListBuffer[(String, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
     tEnv.createTemporaryView("T", t)
     tEnv.createTemporarySystemFunction(
@@ -160,7 +160,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
 
   @TestTemplate
   def testRetractRankWithoutRowNumber(): Unit = {
-    val data = new mutable.MutableList[(String, String, Long)]
+    val data = new mutable.ListBuffer[(String, String, Long)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
     tEnv.createTemporaryView("T", t)
     tEnv.createTemporarySystemFunction(
@@ -240,7 +240,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
 
   def prepareUpdateRankWithRowNumberTester()
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
RowDataHarnessAssertor) = {
-    val data = new mutable.MutableList[(String, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Int, Int)]
     val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'word, 
'cnt, 'type)
     tEnv.createTemporaryView("T", t)
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index 14f16d3f832..2b1737ef6af 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -51,7 +51,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
     this.tEnv = StreamTableEnvironmentImpl.create(env, setting)
   }
 
-  val data = new mutable.MutableList[(Int, Int)]
+  val data = new mutable.ListBuffer[(Int, Int)]
 
   @TestTemplate
   def testTableAggregate(): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index fbee1a8585f..dd449ff6f93 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -87,7 +87,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testEmptyInputAggregation(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
     data.+=((3, 3))
@@ -106,7 +106,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testMaxAggRetractWithCondition(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 10))
     data.+=((1, 10))
     data.+=((2, 5))
@@ -131,7 +131,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testMinAggRetractWithCondition(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 5))
     data.+=((2, 6))
     data.+=((1, 5))
@@ -155,7 +155,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testShufflePojo(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
     data.+=((3, 3))
@@ -176,7 +176,7 @@ class AggregateITCase(
   @Disabled("[FLINK-12215] Fix this when introduce SqlProcessFunction.")
   @TestTemplate
   def testEmptyInputAggregationWithoutGroupBy(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
     data.+=((3, 3))
@@ -195,7 +195,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testAggregationWithoutWatermark(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
     data.+=((3, 3))
@@ -297,7 +297,7 @@ class AggregateITCase(
 
     val chars = List("A", "B", null, "D", "E", "F", "H", null, null, "K", "L", 
"L", "N", "O", "P")
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
 
     for (i <- ids.indices) {
       val v = integers(i)
@@ -359,7 +359,7 @@ class AggregateITCase(
   @TestTemplate
   def testDistinctWithRetract(): Unit = {
     // this case covers LongArrayValueWithRetractionGenerator and 
LongValueWithRetractionGenerator
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((1, 1L, "A"))
     data.+=((1, 1L, "A"))
@@ -411,7 +411,7 @@ class AggregateITCase(
   @TestTemplate
   def testDistinctAggregateMoreThan64(): Unit = {
     // this case is used to cover 
DistinctAggCodeGen#LongArrayValueWithoutRetractionGenerator
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     for (i <- 0 until 100) {
       for (j <- 0 until 100 - i) {
         data.+=((j, i))
@@ -442,7 +442,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testDistinctAggWithNullValues(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -489,7 +489,7 @@ class AggregateITCase(
     var expected = 
List("1.03520274,12345.03520274865400000000,12.3456789012345670000000")
     assertThat(sink.getRetractResults).isEqualTo(expected)
 
-    val data = new mutable.MutableList[Double]
+    val data = new mutable.ListBuffer[Double]
     data.+=(1.11111111)
     data.+=(1.11111111)
     env.setParallelism(1)
@@ -603,7 +603,7 @@ class AggregateITCase(
     var expected = 
List("1.03520274,12345.03520274865400000000,12.3456789012345670000000")
     assertThat(sink.getRetractResults).isEqualTo(expected)
 
-    val data = new mutable.MutableList[Double]
+    val data = new mutable.ListBuffer[Double]
     data.+=(2.22222222)
     data.+=(3.33333333)
     env.setParallelism(1)
@@ -624,7 +624,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testGroupByAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -651,7 +651,7 @@ class AggregateITCase(
   }
 
   def testCountWithNullableIfCall(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -689,7 +689,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testNestedGroupByAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -857,7 +857,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testListAggWithDistinct(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -927,7 +927,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testGroupBySingleValue(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -991,7 +991,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testDecimalSum(): Unit = {
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     data.+=(Row.of(BigDecimal(1).bigDecimal))
     data.+=(Row.of(BigDecimal(2).bigDecimal))
     data.+=(Row.of(BigDecimal(2).bigDecimal))
@@ -1204,7 +1204,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testMinMaxWithBinaryString(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "BC"))
@@ -1239,7 +1239,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testBigDataOfMinMaxWithBinaryString(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     for (i <- 0 until 100) {
       data.+=((i % 10, i, i.toString))
     }
@@ -1272,7 +1272,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testAggWithFilterClause(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String, Boolean)]
+    val data = new mutable.ListBuffer[(Int, Long, String, Boolean)]
     data.+=((1, 5L, "B", true))
     data.+=((1, 4L, "C", false))
     data.+=((1, 2L, "A", true))
@@ -1307,7 +1307,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testMinMaxWithDecimal(): Unit = {
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     data.+=(Row.of(BigDecimal(1).bigDecimal))
     data.+=(Row.of(BigDecimal(2).bigDecimal))
     data.+=(Row.of(BigDecimal(2).bigDecimal))
@@ -1483,7 +1483,7 @@ class AggregateITCase(
          |GROUP BY c
          |""".stripMargin
 
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     for (i <- 0 until 10) {
       data.+=((i, 1L, "Hi"))
     }
@@ -1511,7 +1511,7 @@ class AggregateITCase(
          |FROM MyTable GROUP BY a) t1
          |group by t1.a
          |""".stripMargin
-    val data = new mutable.MutableList[(Int, String)]
+    val data = new mutable.ListBuffer[(Int, String)]
     data.+=((1, "Sam"))
     data.+=((1, "Jerry"))
     data.+=((2, "Ali"))
@@ -1532,7 +1532,7 @@ class AggregateITCase(
   def testSTDDEV(): Unit = {
     val sqlQuery = "SELECT STDDEV_SAMP(a), STDDEV_POP(a) FROM MyTable GROUP BY 
c"
 
-    val data = new mutable.MutableList[(Double, Long, String)]
+    val data = new mutable.ListBuffer[(Double, Long, String)]
     for (i <- 0 until 10) {
       data.+=((i, 1L, "Hi"))
     }
@@ -1552,7 +1552,7 @@ class AggregateITCase(
   def testVAR_POP(): Unit = {
     val sqlQuery = "SELECT VAR_POP(a) FROM MyTable GROUP BY c"
 
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((2900, 1L, "Hi"))
     data.+=((2500, 1L, "Hi"))
     data.+=((2600, 1L, "Hi"))
@@ -1680,7 +1680,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testPruneUselessAggCall(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((3, 2L, "Hello world"))
@@ -1714,7 +1714,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testConstantGroupKeyWithUpsertSink(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -1780,7 +1780,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testOverloadedAccumulator(): Unit = {
-    val data = new mutable.MutableList[(String, Long)]
+    val data = new mutable.ListBuffer[(String, Long)]
     data.+=(("x", 1L))
     data.+=(("x", 2L))
     data.+=(("x", 3L))
@@ -2062,7 +2062,7 @@ class AggregateITCase(
          |FROM ($sql)
     """.stripMargin
 
-    val data = new mutable.MutableList[(Int, Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int, Int)]
     for (i <- 0 until 10) {
       data.+=((i * 2, i + 1, 0))
       data.+=((i * 2, i + 1, 1))
@@ -2122,7 +2122,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testBitmapBuildAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, String)]
     for (i <- 0 until 5) {
       data.+=((i, -i, "a"))
       data.+=((i * 2, -i * 2, "b"))
@@ -2156,7 +2156,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testBitmapBuildAggWithRetract(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, String)]
     for (i <- 0 until 5) {
       data.+=((i, i, "a"))
       data.+=((i + 1, i * 2, "b"))
@@ -2193,7 +2193,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testBitmapLogicalOpsAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, Int, String)]
     data.+=((-3, 5, 0, "a"))
     data.+=((7, 2, 5, "b"))
     data.+=((-3, 8, -8, "c"))
@@ -2238,7 +2238,7 @@ class AggregateITCase(
 
   @TestTemplate
   def testBitmapLogicalOpsAggWithRetract(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, Int, String)]
     for (i <- 3 until 0 by -1) {
       data.+=((i, i + 1, i + 2, "a"))
       data.+=((i * 2, i * 2 + 1, i * 2 + 2, "b"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateRemoveITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateRemoveITCase.scala
index 01007463b45..687d27c7d6e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateRemoveITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateRemoveITCase.scala
@@ -267,7 +267,7 @@ class AggregateRemoveITCase(aggMode: AggMode, minibatch: 
MiniBatchMode, backend:
 
   @TestTemplate
   def testAggregateRemove(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
     data.+=((3, 3))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index b811e2ebeea..99a11bfe4d5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -193,7 +193,7 @@ class CorrelateITCase extends StreamingTestBase {
 
   @Test
   def testMultipleCorrelate(): Unit = {
-    val data = new mutable.MutableList[(String, String, String)]
+    val data = new mutable.ListBuffer[(String, String, String)]
     data.+=(("1", "1,L", "A,B"))
     data.+=(("2", "2,L", "B,C"))
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index 5065c236062..4304ed0b183 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -47,7 +47,7 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: 
StateBackendMode, enable
   @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
     new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension)
 
-  lazy val rowtimeTestData = new mutable.MutableList[(Int, Long, String)]
+  lazy val rowtimeTestData = new mutable.ListBuffer[(Int, Long, String)]
   rowtimeTestData.+=((1, 1L, "Hi"))
   rowtimeTestData.+=((1, 3L, "Hello"))
   rowtimeTestData.+=((1, 2L, "Hello world"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
index 253208e5062..6036599a9ee 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
@@ -50,7 +50,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |    t2.proctime + INTERVAL '5' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 5L, "Hi3"))
@@ -58,7 +58,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=((1, 9L, "Hi6"))
     data1.+=((1, 8L, "Hi8"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
 
@@ -97,7 +97,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |    t2.proctime + INTERVAL '5' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 5L, "Hi3"))
@@ -105,7 +105,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=((1, 9L, "Hi6"))
     data1.+=((1, 8L, "Hi8"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
 
@@ -146,7 +146,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |  t1.b = t2.b
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(String, Long, String)]
+    val data1 = new mutable.ListBuffer[(String, Long, String)]
     data1.+=(("1", 1L, "Hi1"))
     data1.+=(("1", 2L, "Hi2"))
     data1.+=(("1", 5L, "Hi3"))
@@ -154,7 +154,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("1", 9L, "Hi6"))
     data1.+=(("1", 8L, "Hi8"))
 
-    val data2 = new mutable.MutableList[(String, Long, String)]
+    val data2 = new mutable.ListBuffer[(String, Long, String)]
     data2.+=(("1", 5L, "HiHi"))
     data2.+=(("2", 2L, "HeHe"))
 
@@ -193,7 +193,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |    t2.rowtime + INTERVAL '6' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "LEFT0.999", 999L))
     data1.+=(("A", "LEFT1", 1000L))
@@ -205,7 +205,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     // test null key
     data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "RIGHT6", 6000L))
     data2.+=(("B", "RIGHT7", 7000L))
     // test null key
@@ -226,7 +226,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "A,RIGHT6,LEFT1",
       "A,RIGHT6,LEFT2",
       "A,RIGHT6,LEFT3",
@@ -248,7 +248,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |    t2.rowtime + INTERVAL '6' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "LEFT0.999", 999L))
     data1.+=(("A", "LEFT1", 1000L))
@@ -260,7 +260,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     // test null key
     data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "RIGHT6", 6000L))
     data2.+=(("B", "RIGHT7", 7000L))
     // test null key
@@ -281,7 +281,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "A,RIGHT6,LEFT1",
       "A,RIGHT6,LEFT2",
       "A,RIGHT6,LEFT3",
@@ -303,7 +303,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |    t2.rowtime + INTERVAL '6' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "LEFT0.999", 999L))
     data1.+=(("A", "LEFT1", 1000L))
@@ -315,7 +315,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     // test null key
     data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "RIGHT6", 6000L))
     data2.+=(("B", "RIGHT7", 7000L))
     // test null key
@@ -336,7 +336,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "A,RIGHT6,LEFT1",
       "A,RIGHT6,LEFT2",
       "A,RIGHT6,LEFT3",
@@ -371,7 +371,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val sqlQuery = "SELECT key, COUNT(DISTINCT id1), COUNT(DISTINCT id2) FROM 
(" +
       innerSql + ") GROUP BY key"
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "LEFT0.999", 999L))
     data1.+=(("A", "LEFT1", 1000L))
@@ -383,7 +383,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     // test null key
     data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "RIGHT6", 6000L))
     data2.+=(("B", "RIGHT7", 7000L))
     // test null key
@@ -404,7 +404,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList("A,1,5", "B,1,1")
+    val expected = mutable.ListBuffer("A,1,5", "B,1,1")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected)
   }
 
@@ -420,7 +420,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |t2.rowtime = t1.rowtime
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, Long, String)]
+    val data1 = new mutable.ListBuffer[(String, Long, String)]
     data1.+=(("K1", 1000L, "L1"))
     data1.+=(("K1", 1000L, "L2"))
     data1.+=(("K1", 1000L, "L3"))
@@ -433,7 +433,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     // See https://issues.apache.org/jira/browse/FLINK-24466
     // data1.+=(("K2", 1000L, "should-be-discarded"))
 
-    val data2 = new mutable.MutableList[(String, Long, String)]
+    val data2 = new mutable.ListBuffer[(String, Long, String)]
     data2.+=(("K1", 1000L, "R1"))
     data2.+=(("K1", 1000L, "R2"))
     data2.+=(("K1", 1000L, "R3"))
@@ -462,7 +462,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "K1,1000,L1,R1",
       "K1,1000,L1,R2",
       "K1,1000,L1,R3",
@@ -494,7 +494,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |  t1.b > 2
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String, Long)]
     data1.+=((1, 4L, "LEFT1", 1000L))
     // for boundary test
     data1.+=((1, 8L, "LEFT1.1", 1001L))
@@ -507,7 +507,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=((1, 4L, "LEFT5", 5000L))
     data1.+=((1, 10L, "LEFT6", 6000L))
 
-    val data2 = new mutable.MutableList[(Int, Long, String, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String, Long)]
     // just for watermark
     data2.+=((1, 1L, "RIGHT1", 1000L))
     data2.+=((1, 9L, "RIGHT6", 6000L))
@@ -531,7 +531,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     env.execute()
 
     // There may be two expected results according to the process order.
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "1,LEFT3,RIGHT6",
       "1,LEFT1.1,RIGHT6",
       "2,LEFT4,RIGHT7",
@@ -552,7 +552,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |  QUARTER(t1.rowtime) = t2.a
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String, Long)]
     data1.+=((1, 4L, "LEFT1", 1000L))
     data1.+=((1, 2L, "LEFT2", 2000L))
     data1.+=((1, 7L, "LEFT3", 3000L))
@@ -560,7 +560,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=((1, 4L, "LEFT5", 5000L))
     data1.+=((1, 10L, "LEFT6", 6000L))
 
-    val data2 = new mutable.MutableList[(Int, Long, String, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String, Long)]
     data2.+=((1, 1L, "RIGHT1", 1000L))
     data2.+=((1, 9L, "RIGHT6", 6000L))
     data2.+=((2, 8, "RIGHT7", 7000L))
@@ -583,7 +583,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     env.execute()
 
     val expected = mutable
-      .MutableList[String]("1,LEFT3,RIGHT6", "1,LEFT5,RIGHT6", 
"1,LEFT5,RIGHT8", "1,LEFT6,RIGHT8")
+      .ListBuffer[String]("1,LEFT3,RIGHT6", "1,LEFT5,RIGHT6", 
"1,LEFT5,RIGHT8", "1,LEFT6,RIGHT8")
 
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.toList.sorted)
   }
@@ -601,7 +601,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |GROUP BY TUMBLE(t1.rowtime, INTERVAL '4' SECOND), t1.key
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     data1.+=(("A", "L-1", 1000L)) // no joining record
     data1.+=(("A", "L-2", 2000L)) // 1 joining record
     data1.+=(("A", "L-3", 3000L)) // 2 joining records
@@ -611,7 +611,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("A", "L-6", 10000L)) // 2 joining records
     data1.+=(("A", "L-7", 13000L)) // 1 joining record
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-1", 7000L)) // 3 joining records
     data2.+=(("B", "R-4", 7000L)) // 1 joining records
     data2.+=(("A", "R-3", 8000L)) // 3 joining records
@@ -634,7 +634,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = t_r.toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,1970-01-01T00:00:04,3",
       "A,1970-01-01T00:00:12,2",
       "A,1970-01-01T00:00:16,1",
@@ -656,7 +656,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         | GROUP BY TUMBLE(t2.rowtime, INTERVAL '4' SECOND), t2.key
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     data1.+=(("A", "L-1", 1000L)) // no joining record
     data1.+=(("A", "L-2", 2000L)) // 1 joining record
     data1.+=(("A", "L-3", 3000L)) // 2 joining records
@@ -665,7 +665,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("A", "L-6", 10000L)) // 2 joining records
     data1.+=(("A", "L-7", 13000L)) // 1 joining record
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-1", 7000L)) // 3 joining records
     data2.+=(("B", "R-4", 7000L)) // 1 joining records
     data2.+=(("A", "R-3", 8000L)) // 3 joining records
@@ -687,7 +687,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,1970-01-01T00:00:08,3",
       "A,1970-01-01T00:00:12,3",
       "B,1970-01-01T00:00:08,1")
@@ -708,13 +708,13 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         | t2.proctime + INTERVAL '3' SECOND
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 5L, "Hi3"))
     data1.+=((2, 7L, "Hi5"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
 
@@ -748,7 +748,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         | t1.id <> 'L-5'
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("A", "L-2", 2000L))
@@ -760,7 +760,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("A", "L-12", 12000L))
     data1.+=(("A", "L-20", 20000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
     data2.+=(("D", "R-8", 8000L))
@@ -783,7 +783,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,R-6,L-1",
       "A,R-6,L-2",
       "A,R-6,L-6",
@@ -812,13 +812,13 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |  t2.rowtime + INTERVAL '1' SECOND
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("B", "L-4", 4000L))
     data1.+=(("C", "L-7", 7000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
     data2.+=(("D", "R-8", 8000L))
@@ -840,7 +840,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "null,null,L-1",
       "null,null,L-4",
       "null,null,L-7"
@@ -862,13 +862,13 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         | t2.proctime + INTERVAL '3' SECOND
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 5L, "Hi3"))
     data1.+=((2, 7L, "Hi5"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
 
@@ -901,7 +901,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         | t2.id <> 'R-5'
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("A", "L-2", 2000L))
@@ -911,7 +911,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("A", "L-10", 10000L))
     data1.+=(("A", "L-12", 12000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-5", 5000L))
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
@@ -934,7 +934,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
     result.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,R-5,null",
       "A,R-6,L-1",
       "A,R-6,L-2",
@@ -960,13 +960,13 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |t2.rowtime + INTERVAL '1' SECOND
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("B", "L-4", 4000L))
     data1.+=(("C", "L-7", 7000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
     data2.+=(("D", "R-8", 8000L))
@@ -988,7 +988,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,R-6,null",
       "B,R-7,null",
       "D,R-8,null"
@@ -1010,13 +1010,13 @@ class IntervalJoinITCase(mode: StateBackendMode) 
extends StreamingWithStateTestB
         |t2.proctime
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 5L, "Hi3"))
     data1.+=((2, 7L, "Hi5"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
 
@@ -1048,7 +1048,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
         |NOT (t1.id = 'L-5' OR t2.id = 'R-5')
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("A", "L-2", 2000L))
@@ -1060,7 +1060,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=(("A", "L-12", 12000L))
     data1.+=(("A", "L-20", 20000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-5", 5000L))
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
@@ -1083,7 +1083,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "A,R-6,L-1",
       "A,R-6,L-2",
       "A,R-6,L-6",
@@ -1110,13 +1110,13 @@ class IntervalJoinITCase(mode: StateBackendMode) 
extends StreamingWithStateTestB
         |t2.rowtime + INTERVAL '4' SECOND
       """.stripMargin
 
-    val data1 = new mutable.MutableList[(String, String, Long)]
+    val data1 = new mutable.ListBuffer[(String, String, Long)]
     // for boundary test
     data1.+=(("A", "L-1", 1000L))
     data1.+=(("B", "L-4", 4000L))
     data1.+=(("C", "L-7", 7000L))
 
-    val data2 = new mutable.MutableList[(String, String, Long)]
+    val data2 = new mutable.ListBuffer[(String, String, Long)]
     data2.+=(("A", "R-6", 6000L))
     data2.+=(("B", "R-7", 7000L))
     data2.+=(("D", "R-8", 8000L))
@@ -1138,7 +1138,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList[String](
+    val expected = mutable.ListBuffer[String](
       "null,null,L-1",
       "null,null,L-4",
       "null,null,L-7",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index d65c35b78d9..5042dd540ce 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -132,7 +132,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     env.execute()
 
     val expected = mutable
-      .MutableList(
+      .ListBuffer(
         "1,1,Hi,1,1,0,Hallo,1",
         "1,1,Hi,2,2,1,Hallo Welt,2",
         "1,1,Hi,2,3,2,Hallo Welt wie,1",
@@ -183,7 +183,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList("1,2,hi a2,3,4,hi b1", "2,3,hi 
a3,4,5,null").toList
+    val expected = mutable.ListBuffer("1,2,hi a2,3,4,hi b1", "2,3,hi 
a3,4,5,null").toList
 
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
@@ -191,7 +191,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
   /** test non-window inner join * */
   @TestTemplate
   def testNonWindowInnerJoin(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 2L, "Hi2"))
@@ -201,7 +201,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     data1.+=((1, 8L, "Hi8"))
     data1.+=((3, 8L, "Hi9"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
     data2.+=((3, 2L, "HeHe"))
@@ -228,7 +228,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     tEnv.sqlQuery(sqlQuery).toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,HiHi,Hi2",
       "1,HiHi,Hi2",
       "1,HiHi,Hi3",
@@ -241,7 +241,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testIsNullInnerJoinWithNullCond(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 2L, "Hi2"))
@@ -251,7 +251,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     data1.+=((1, 8L, "Hi8"))
     data1.+=((3, 8L, "Hi9"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "HiHi"))
     data2.+=((2, 2L, "HeHe"))
     data2.+=((3, 2L, "HeHe"))
@@ -281,7 +281,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     tEnv.sqlQuery(sqlQuery).toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,HiHi,Hi2",
       "1,HiHi,Hi2",
       "1,HiHi,Hi3",
@@ -359,7 +359,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testInnerJoinWithBooleanFilterCondition(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long, String, Boolean)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String, Boolean)]
     data1.+=((1, 1L, "Hi", true))
     data1.+=((2, 2L, "Hello", false))
     data1.+=((3, 2L, "Hello world", true))
@@ -643,7 +643,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
   @TestTemplate
   def testThreeWayMultiJoinWithoutPk(): Unit = {
     env.setParallelism(1)
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((1, 2L))
     data1.+=((1, 2L))
@@ -653,13 +653,13 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     data1.+=((1, 8L))
     data1.+=((3, 8L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
     data2.+=((1, 4L))
 
-    val data3 = new mutable.MutableList[(Int, Long)]
+    val data3 = new mutable.ListBuffer[(Int, Long)]
     data3.+=((1, 1L))
     data3.+=((2, 2L))
     data3.+=((3, 2L))
@@ -1182,12 +1182,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullLeftOuterJoin(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1214,7 +1214,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "4,2,null,null",
       "null,8,null,null"
@@ -1225,12 +1225,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullLeftOuterJoinWithNullCond(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1257,7 +1257,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "4,2,null,null",
       "null,8,null,2"
@@ -1268,12 +1268,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullRightOuterJoin(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1299,7 +1299,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "null,null,2,2",
       "null,null,null,2"
@@ -1310,12 +1310,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullRightOuterJoinWithNullCond(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1342,7 +1342,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "null,null,2,2",
       "null,8,null,2"
@@ -1353,12 +1353,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullFullOuterJoin(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1385,7 +1385,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "null,null,2,2",
       "4,2,null,null",
@@ -1398,12 +1398,12 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testNullFullOuterJoinWithNullCond(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((3, 8L))
     data1.+=((4, 2L))
 
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, 1L))
     data2.+=((2, 2L))
     data2.+=((3, 2L))
@@ -1431,7 +1431,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,1,1",
       "null,null,2,2",
       "4,2,null,null",
@@ -1443,11 +1443,11 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testJoinWithoutWatermark(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long)]
+    val data1 = new mutable.ListBuffer[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((2, 2L))
     data1.+=((3, 3L))
-    val data2 = new mutable.MutableList[(Int, Long)]
+    val data2 = new mutable.ListBuffer[(Int, Long)]
     data2.+=((1, -1L))
     data2.+=((2, -2L))
     data2.+=((3, -3L))
@@ -1469,7 +1469,7 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
   def testBigDataOfJoin(): Unit = {
     env.setParallelism(1)
 
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     for (i <- 0 until 500) {
       data.+=((i % 10, i, i.toString))
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
index 0178ef32889..bae1710b51e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -53,7 +53,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends 
StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(Int, String)]
+    val data = new mutable.ListBuffer[(Int, String)]
     data.+=((1, "a"))
     data.+=((2, "z"))
     data.+=((3, "b"))
@@ -93,7 +93,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends 
StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("6,7,8")
+    val expected = mutable.ListBuffer("6,7,8")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -102,7 +102,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(Int, String, String)]
+    val data = new mutable.ListBuffer[(Int, String, String)]
     data.+=((1, "a", null))
     data.+=((2, "b", null))
     data.+=((3, "c", null))
@@ -143,7 +143,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,null,3,null", "6,null,8,null")
+    val expected = mutable.ListBuffer("1,null,3,null", "6,null,8,null")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -155,7 +155,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     //  However code split is not supported in planner yet.
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
-    val data = new mutable.MutableList[(Int, String, String, String)]
+    val data = new mutable.ListBuffer[(Int, String, String, String)]
     data.+=((1, "a", "key1", "second_key3"))
     data.+=((2, "b", "key1", "second_key3"))
     data.+=((3, "c", "key1", "second_key3"))
@@ -198,7 +198,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "key1,second_key3,1,key1,2,3,second_key3",
       "key2,second_key4,6,key2,7,8,second_key4")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
@@ -260,7 +260,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("10,11,12")
+    val expected = mutable.ListBuffer("10,11,12")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -269,7 +269,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Long, Int, Int)]
     // first window
     data.+=(("ACME", Duration.ofSeconds(1).toMillis, 1, 1))
     data.+=(("ACME", Duration.ofSeconds(2).toMillis, 2, 2))
@@ -342,7 +342,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Long, Int, Int)]
     // first window
     data.+=(("ACME", Duration.ofSeconds(1).toMillis, 1, 1))
     data.+=(("ACME", Duration.ofSeconds(2).toMillis, 2, 2))
@@ -472,7 +472,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Long, Int, Int)]
     data.+=(("ACME", 1L, 19, 1))
     data.+=(("ACME", 2L, 17, 2))
     data.+=(("ACME", 3L, 13, 3))
@@ -523,7 +523,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Long, Int, Int)]
     data.+=(("ACME", 1L, 19, 1))
     data.+=(("ACME", 2L, 17, 2))
     data.+=(("ACME", 3L, 13, 3))
@@ -581,7 +581,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(Int, String, Long, Int)]
+    val data = new mutable.ListBuffer[(Int, String, Long, Int)]
     data.+=((1, "ACME", 1L, 20))
     data.+=((2, "ACME", 2L, 19))
     data.+=((3, "ACME", 3L, 18))
@@ -643,7 +643,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    val data = new mutable.ListBuffer[(String, Long, Int, Int)]
     data.+=(("ACME", 1L, 19, 1))
     data.+=(("ACME", 2L, 17, 2))
     data.+=(("ACME", 3L, 13, 3))
@@ -696,7 +696,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
-    val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+    val data = new mutable.ListBuffer[(Int, String, Long, Double, Int)]
     data.+=((1, "a", 1, 0.8, 1))
     data.+=((2, "z", 2, 0.8, 3))
     data.+=((3, "b", 1, 0.8, 2))
@@ -749,7 +749,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8", 
"9,4,0,null,3,4,3.2,12")
+    val expected = mutable.ListBuffer("1,5,0,null,2,3,3.4,8", 
"9,4,0,null,3,4,3.2,12")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -759,7 +759,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     data.+=(Row.of(Int.box(1), "a", Int.box(10)))
     data.+=(Row.of(Int.box(2), "z", Int.box(10)))
     data.+=(Row.of(Int.box(3), "b", null))
@@ -808,7 +808,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("29,7,5,8,6,8")
+    val expected = mutable.ListBuffer("29,7,5,8,6,8")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -817,7 +817,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
-    val data = new mutable.MutableList[(Int, String)]
+    val data = new mutable.ListBuffer[(Int, String)]
     data.+=((1, "a"))
 
     val t =
@@ -860,7 +860,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
-    val data = new mutable.MutableList[(Int, String, Long)]
+    val data = new mutable.ListBuffer[(Int, String, Long)]
     data.+=((1, "a", 1))
     data.+=((2, "a", 1))
     data.+=((3, "a", 1))
@@ -906,7 +906,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,PREF:a,8,5", "7,PREF:a,6,9")
+    val expected = mutable.ListBuffer("1,PREF:a,8,5", "7,PREF:a,6,9")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
index a1f60de7746..f98e72b1b4e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
@@ -943,7 +943,7 @@ class OverAggregateITCase(mode: StateBackendMode, 
unboundedOverVersion: Int)
     tEnv.sqlQuery(sqlQuery).toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       s"1,2,Hello,0,2,1,${2 / 1},2,2",
       s"1,3,Hello world,0,5,2,${5 / 2},3,2",
       s"1,1,Hi,0,6,3,${6 / 3},3,1",
@@ -1065,7 +1065,7 @@ class OverAggregateITCase(mode: StateBackendMode, 
unboundedOverVersion: Int)
     tEnv.sqlQuery(sqlQuery).toDataStream.addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       s"2,2,Hello,2,1,${2 / 1},2,2",
       s"3,5,Hello,7,2,${7 / 2},5,2",
       s"1,3,Hello,10,3,${10 / 3},5,2",
@@ -1412,7 +1412,7 @@ class OverAggregateITCase(mode: StateBackendMode, 
unboundedOverVersion: Int)
 
   @TestTemplate
   def testDecimalSum0(): Unit = {
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     data.+=(Row.of(BigDecimal(1.11).bigDecimal))
     data.+=(Row.of(BigDecimal(2.22).bigDecimal))
     data.+=(Row.of(BigDecimal(3.33).bigDecimal))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SetOperatorsITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SetOperatorsITCase.scala
index 7354b742ab4..0aab39a8c8b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SetOperatorsITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SetOperatorsITCase.scala
@@ -47,13 +47,13 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val sink = new TestingRetractSink
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
-    val expected = mutable.MutableList("1,1,Hi", "2,2,Hello", "3,2,Hello 
world")
+    val expected = mutable.ListBuffer("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
   @TestTemplate
   def testExcept(): Unit = {
-    val data1 = new mutable.MutableList[(Int, Long, String)]
+    val data1 = new mutable.ListBuffer[(Int, Long, String)]
     data1.+=((1, 1L, "Hi1"))
     data1.+=((1, 2L, "Hi2"))
     data1.+=((1, 2L, "Hi2"))
@@ -63,7 +63,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     data1.+=((1, 8L, "Hi8"))
     data1.+=((3, 8L, "Hi9"))
 
-    val data2 = new mutable.MutableList[(Int, Long, String)]
+    val data2 = new mutable.ListBuffer[(Int, Long, String)]
     data2.+=((1, 1L, "Hi1"))
     data2.+=((2, 2L, "Hi2"))
     data2.+=((3, 2L, "Hi3"))
@@ -78,7 +78,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val sink = new TestingRetractSink
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Hi5",
       "Hi6",
       "Hi8",
@@ -99,7 +99,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val sink = new TestingRetractSink
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
-    val expected = mutable.MutableList("1", "2", "2")
+    val expected = mutable.ListBuffer("1", "2", "2")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -118,7 +118,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     val sink = new TestingRetractSink
     
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Hi",
       "Hello",
       "Hello",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
index 6e3f09f0ff1..8ba436b52eb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
@@ -37,7 +37,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   @TestTemplate
   def testDisableSortNonTemporalField(): Unit = {
     val sqlQuery = "SELECT * FROM a ORDER BY a2"
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("0", "4"))
     data.+=(("3", "3"))
     data.+=(("1", "2"))
@@ -54,7 +54,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   @TestTemplate
   def testSort(): Unit = {
     val sqlQuery = "SELECT * FROM a ORDER BY a2"
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("0", "4"))
     data.+=(("3", "3"))
     data.+=(("1", "2"))
@@ -79,7 +79,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   def testSortOrderByDesc(): Unit = {
     val sqlQuery = "SELECT * FROM a ORDER BY a1 DESC"
 
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("0", "4"))
     data.+=(("3", "3"))
     data.+=(("1", "2"))
@@ -105,7 +105,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   def testSortOrderByMultipleFields(): Unit = {
     val sqlQuery = "SELECT * FROM a ORDER BY a1, a2"
 
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("5", "1"))
     data.+=(("0", "4"))
     data.+=(("1", "7"))
@@ -130,7 +130,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   def testSortOrderByRepeatedFields(): Unit = {
     val sqlQuery = "SELECT * FROM a ORDER BY a1, a1"
 
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("5", "1"))
     data.+=(("0", "4"))
     data.+=(("1", "7"))
@@ -155,7 +155,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   def testSortOrderByWithRetract(): Unit = {
     val sqlQuery = "SELECT a1, count(*) as c FROM a GROUP BY a1 ORDER BY c"
 
-    val data = new mutable.MutableList[(String, String)]
+    val data = new mutable.ListBuffer[(String, String)]
     data.+=(("1", "1"))
     data.+=(("2", "1"))
     data.+=(("3", "1"))
@@ -189,7 +189,7 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
          |select * from a where a1 < all (select a1 * 2 from a) order by a1 
desc
        """.stripMargin
 
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((8, 1))
     data.+=((7, 2))
     data.+=((6, 3))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 6e3b0ca78ec..e9b374c3645 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -147,7 +147,7 @@ class SplitAggregateITCase(
 
     val chars = List("A", "B", null, "D", "E", "F", "H", null, null, "K", "L", 
"L", "N", "O", "P")
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
 
     for (i <- ids.indices) {
       val v = integers(i)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
index 748b5076fdb..402cdd69e89 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
@@ -57,7 +57,7 @@ abstract class StreamFileSystemITCaseBase extends 
StreamingTestBase with FileSys
 
   override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit 
= {
     val result = tEnv.sqlQuery(sqlQuery).toDataStream
-    val sinkResults = new mutable.MutableList[Row]
+    val sinkResults = new mutable.ListBuffer[Row]
 
     val sink = new AbstractExactlyOnceSink[Row] {
       override def invoke(value: Row, context: SinkFunction.Context): Unit =
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index d69e659b7f0..de355f9cff7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -283,7 +283,7 @@ class TableSourceITCase extends StreamingTestBase {
 
   @Test
   def testBitmapDataType(): Unit = {
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     val empty = Bitmap.empty()
     for (i <- 1 to 3) {
       val bitmap = Bitmap.fromArray(Array[Int](i, i + 1))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
index 07343f4af68..a706c05939a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
@@ -60,14 +60,14 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
         |WHERE r.currency = o.currency
         |""".stripMargin
 
-    val ordersData = new mutable.MutableList[(Long, String)]
+    val ordersData = new mutable.ListBuffer[(Long, String)]
     ordersData.+=((2L, "Euro"))
     ordersData.+=((1L, "US Dollar"))
     ordersData.+=((50L, "Yen"))
     ordersData.+=((3L, "Euro"))
     ordersData.+=((5L, "US Dollar"))
 
-    val ratesHistoryData = new mutable.MutableList[(String, Long)]
+    val ratesHistoryData = new mutable.ListBuffer[(String, Long)]
     ratesHistoryData.+=(("US Dollar", 102L))
     ratesHistoryData.+=(("Euro", 114L))
     ratesHistoryData.+=(("Yen", 1L))
@@ -122,14 +122,14 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
         |WHERE r.currency = o.currency
         |""".stripMargin
 
-    val ordersData = new mutable.MutableList[(Long, String)]
+    val ordersData = new mutable.ListBuffer[(Long, String)]
     ordersData.+=((2L, "Euro"))
     ordersData.+=((1L, "US Dollar"))
     ordersData.+=((50L, "Yen"))
     ordersData.+=((3L, "Euro"))
     ordersData.+=((5L, "US Dollar"))
 
-    val ratesHistoryData = new mutable.MutableList[(String, Long)]
+    val ratesHistoryData = new mutable.ListBuffer[(String, Long)]
     ratesHistoryData.+=(("US Dollar", 102L))
     ratesHistoryData.+=(("Euro", 114L))
     ratesHistoryData.+=(("Yen", 1L))
@@ -177,13 +177,13 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
         |WHERE r.currency = o.currency
         |""".stripMargin
 
-    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
+    val ordersData = new mutable.ListBuffer[(Long, String, Timestamp)]
     ordersData.+=((2L, "Euro", new Timestamp(2L)))
     ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
     ordersData.+=((50L, "Yen", new Timestamp(4L)))
     ordersData.+=((3L, "Euro", new Timestamp(5L)))
 
-    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+    val ratesHistoryData = new mutable.ListBuffer[(String, Long, Timestamp)]
     ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
     ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
     ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
@@ -244,7 +244,7 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
         |  r.currency = p.currency
         |""".stripMargin
 
-    val ordersData = new mutable.MutableList[(Long, String, Long, Timestamp)]
+    val ordersData = new mutable.ListBuffer[(Long, String, Long, Timestamp)]
     ordersData.+=((1L, "A1", 2L, new Timestamp(2L)))
     ordersData.+=((2L, "A2", 1L, new Timestamp(3L)))
     ordersData.+=((3L, "A4", 50L, new Timestamp(4L)))
@@ -254,7 +254,7 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
       .assignTimestampsAndWatermarks(new TimestampExtractor[(Long, String, 
Long, Timestamp)]())
       .toTable(tEnv, 'orderId, 'productId, 'amount, 'rowtime.rowtime)
 
-    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+    val ratesHistoryData = new mutable.ListBuffer[(String, Long, Timestamp)]
     ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
     ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
     ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
@@ -265,7 +265,7 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
       .assignTimestampsAndWatermarks(new TimestampExtractor[(String, Long, 
Timestamp)]())
       .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
 
-    val pricesHistoryData = new mutable.MutableList[(String, String, Double, 
Timestamp)]
+    val pricesHistoryData = new mutable.ListBuffer[(String, String, Double, 
Timestamp)]
     pricesHistoryData.+=(("A2", "US Dollar", 10.2d, new Timestamp(1L)))
     pricesHistoryData.+=(("A1", "Euro", 11.4d, new Timestamp(1L)))
     pricesHistoryData.+=(("A4", "Yen", 1d, new Timestamp(1L)))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimestampITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimestampITCase.scala
index 156b74df2c2..b9ec657a0d6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimestampITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimestampITCase.scala
@@ -59,7 +59,7 @@ class TimestampITCase extends StreamingTestBase {
       Timestamp.valueOf("1973-01-01 00:00:00").toLocalDateTime
     )
 
-    val instants = new mutable.MutableList[Instant]
+    val instants = new mutable.ListBuffer[Instant]
     for (i <- datetimes.indices) {
       if (datetimes(i) == null) {
         instants += null
@@ -70,7 +70,7 @@ class TimestampITCase extends StreamingTestBase {
       }
     }
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
 
     for (i <- ints.indices) {
       data += row(ints(i), longs(i), datetimes(i), timestamps(i), instants(i))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 08e27c452cc..bf3595249eb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -62,13 +62,13 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,10", "2,21", "3,12")
+    val expected = mutable.ListBuffer("1,10", "2,21", "3,12")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
   @TestTemplate
   def testMaxAggRetractWithCondition(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 10))
     data.+=((1, 10))
     data.+=((2, 5))
@@ -97,7 +97,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testMinAggRetractWithCondition(): Unit = {
-    val data = new mutable.MutableList[(Int, Int)]
+    val data = new mutable.ListBuffer[(Int, Int)]
     data.+=((1, 5))
     data.+=((2, 6))
     data.+=((1, 5))
@@ -135,13 +135,13 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,3,3", "2,3,4", "3,4,4")
+    val expected = mutable.ListBuffer("1,3,3", "2,3,4", "3,4,4")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
   @TestTemplate
   def testDistinctAggregate(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, String)]
     data.+=((1, 1, "A"))
     data.+=((2, 2, "B"))
     data.+=((2, 2, "B"))
@@ -170,7 +170,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("A,2,5,1,1,1", "B,3,12,4,2,3", 
"C,2,9,4,3,4", "D,1,9,9,4,9")
+    val expected = mutable.ListBuffer("A,2,5,1,1,1", "B,3,12,4,2,3", 
"C,2,9,4,3,4", "D,1,9,9,4,9")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -185,7 +185,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,4,5", "2,4,7", "3,2,3")
+    val expected = mutable.ListBuffer("1,4,5", "2,4,7", "3,2,3")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -198,7 +198,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 //    t.toRetractStream[Row].addSink(sink)
 //    env.execute()
 //
-//    val expected = mutable.MutableList("Hi,Hello world,Hi#Hello#Hello world")
+//    val expected = mutable.ListBuffer("Hi,Hello world,Hi#Hello#Hello world")
 //    assertEquals(expected.sorted, sink.getRetractResults.sorted)
 //  }
 
@@ -213,7 +213,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", 
"5,null", "6,null")
+    val expected = mutable.ListBuffer("1,null", "2,null", "3,null", "4,null", 
"5,null", "6,null")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -229,7 +229,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,5", "2,7", "3,3")
+    val expected = mutable.ListBuffer("1,5", "2,7", "3,3")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -314,7 +314,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     t.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       s"0,1,1,1,1,0,Hallo",
       s"1,2,3,3,2,13,Hallo Welt-ABC-JKL",
       s"12,3,5,1,13,12,IJK",
@@ -350,7 +350,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testGroupAggregateWithStateBackend(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -384,7 +384,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testRemoveDuplicateRecordsWithUpsertSink(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -449,7 +449,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testGroupAggregateWithDataView(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "A"))
     data.+=((2, 2L, "B"))
     data.+=((3, 2L, "B"))
@@ -495,13 +495,13 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     results.addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList("1,1", "2,3", "3,6", "4,10", "5,15", 
"6,21")
+    val expected = mutable.ListBuffer("1,1", "2,3", "3,6", "4,10", "5,15", 
"6,21")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
   @TestTemplate
   def testPrecisionForSumAggregationOnDecimal(): Unit = {
-    val data = new mutable.MutableList[(Double, Double, Double, Double)]
+    val data = new mutable.ListBuffer[(Double, Double, Double, Double)]
     data.+=((1.03520274, 12345.035202748654, 12.345678901234567, 1.11111111))
     data.+=((0, 0, 0, 1.11111111))
     val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd)
@@ -526,7 +526,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testPrecisionForSum0AggregationOnDecimal(): Unit = {
-    val data = new mutable.MutableList[(Double, Double, Double, Double)]
+    val data = new mutable.ListBuffer[(Double, Double, Double, Double)]
     data.+=((1.03520274, 12345.035202748654, 12.345678901234567, 1.11111111))
     data.+=((0, 0, 0, 1.11111111))
     val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd)
@@ -551,7 +551,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testPrecisionForAvgAggregationOnDecimal(): Unit = {
-    val data = new mutable.MutableList[(Double, Double, Double, Double)]
+    val data = new mutable.ListBuffer[(Double, Double, Double, Double)]
     data.+=((1.03520274, 12345.035202748654, 12.345678901234567, 1.11111111))
     data.+=((0, 0, 0, 2.22222222))
     val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd)
@@ -609,7 +609,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testBitmapBuildAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, String)]
     for (i <- 0 until 5) {
       data.+=((i, -i, "a"))
       data.+=((i * 2, -i * 2, "b"))
@@ -638,7 +638,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   @TestTemplate
   def testBitmapLogicalOpsAgg(): Unit = {
-    val data = new mutable.MutableList[(Int, Int, Int, String)]
+    val data = new mutable.ListBuffer[(Int, Int, Int, String)]
     data.+=((-3, 5, 0, "a"))
     data.+=((7, 2, 5, "b"))
     data.+=((-3, 8, -8, "c"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
index 8742e704be9..ee455bbe8d6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
@@ -64,7 +64,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Hello,5", "Hello world,11")
+    val expected = mutable.ListBuffer("Hello,5", "Hello world,11")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -78,7 +78,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,1,Hi", "2,2,Hello", "3,2,Hello 
world")
+    val expected = mutable.ListBuffer("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -94,7 +94,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList("3")
+    val expected = mutable.ListBuffer("3")
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -111,7 +111,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
       .addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,1,one", "2,2,two", "3,3,three")
+    val expected = mutable.ListBuffer("1,1,one", "2,2,two", "3,3,three")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -126,7 +126,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1", "2", "3")
+    val expected = mutable.ListBuffer("1", "2", "3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -144,7 +144,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1",
       "2,2",
       "3,2",
@@ -180,7 +180,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,1,Hi", "2,2,Hello", "3,2,Hello 
world")
+    val expected = mutable.ListBuffer("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -197,7 +197,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.filter('a === 3).toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("3,2,Hello world")
+    val expected = mutable.ListBuffer("3,2,Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -230,7 +230,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.filter(true).toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,1,Hi", "2,2,Hello", "3,2,Hello 
world")
+    val expected = mutable.ListBuffer("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -250,7 +250,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     filterDs.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "4,3,Hello world, how are you?",
       "6,3,Luke Skywalker",
       "8,4,Comment#2",
@@ -275,7 +275,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     val sink = new TestingAppendSink
     filterDs.toDataStream.addSink(sink)
     env.execute()
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "7,4,Comment#1",
       "9,4,Comment#3",
       "11,5,Comment#5",
@@ -301,7 +301,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Hello")
+    val expected = mutable.ListBuffer("Hello")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -324,13 +324,13 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Hello", "Hello world")
+    val expected = mutable.ListBuffer("Hello", "Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
   @TestTemplate
   def testScalarFunctionConstructorWithParams(): Unit = {
-    val testData = new mutable.MutableList[(Int, Long, String)]
+    val testData = new mutable.ListBuffer[(Int, Long, String)]
     testData.+=((1, 1L, "Jack#22"))
     testData.+=((2, 2L, "John#19"))
     testData.+=((3, 2L, "Anna#44"))
@@ -349,7 +349,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44",
       "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22",
       "default-John#19,Sunny-John#19,kevin2-John#19",
@@ -371,7 +371,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       ">>1",
       ">>2",
       ">>3",
@@ -390,7 +390,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       ">>1",
       ">>2",
       ">>3",
@@ -420,7 +420,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
 
         env.execute()
 
-        val expected = mutable.MutableList(
+        val expected = mutable.ListBuffer(
           ">>1",
           ">>2",
           ">>3",
@@ -516,7 +516,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "{10=Comment#4}",
       "{11=Comment#5}",
       "{12=Comment#6}",
@@ -544,7 +544,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
 
   @TestTemplate
   def testColumnOperation(): Unit = {
-    val testData = new mutable.MutableList[(Int, Long, String)]
+    val testData = new mutable.ListBuffer[(Int, Long, String)]
     testData.+=((1, 1L, "Kevin"))
     testData.+=((2, 2L, "Sunny"))
 
@@ -577,7 +577,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,Kevin is a kid,1,str,last,3",
       "2,Sunny is a kid,1,str,last,4"
     )
@@ -599,7 +599,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("3", "4", "5")
+    val expected = mutable.ListBuffer("3", "4", "5")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -618,7 +618,7 @@ class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("3", "4", "5")
+    val expected = mutable.ListBuffer("3", "4", "5")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CorrelateITCase.scala
index 469b2d7beff..62fe71cd4f5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CorrelateITCase.scala
@@ -64,7 +64,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
+    val expected = mutable.ListBuffer("Jack#22,Jack,22", "Anna#44,Anna,44")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -83,7 +83,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
       .addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "nosharp,null,null",
       "Jack#22,Jack,22",
       "John#19,John,19",
@@ -129,7 +129,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
+    val expected = mutable.ListBuffer("Jack#22,Jack,22", "John#19,John,19")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -148,7 +148,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("3,Hello", "3,world")
+    val expected = mutable.ListBuffer("3,Hello", "3,world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -172,7 +172,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     env.execute()
 
     val expected =
-      mutable.MutableList("1,Hi", "1,test", "2,Hello", "2,test", "3,Hello 
world", "3,test")
+      mutable.ListBuffer("1,Hi", "1,test", "2,Hello", "2,test", "3,Hello 
world", "3,test")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -197,7 +197,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44",
       "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44",
       "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22",
@@ -222,7 +222,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Anna#44,1",
       "Anna#44,2",
       "Anna#44,Anna#44",
@@ -247,7 +247,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result1.toDataStream.addSink(sink1)
     env.execute()
 
-    val expected1 = mutable.MutableList(
+    val expected1 = mutable.ListBuffer(
       "Anna#44,1",
       "Anna#44,2",
       "Jack#22,1",
@@ -290,7 +290,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     result.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,2,3,3", "1,2,3,3")
+    val expected = mutable.ListBuffer("1,2,3,3", "1,2,3,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -354,7 +354,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     ds.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Jack,4", "22,2", "John,4", "19,2", 
"Anna,4", "44,2")
+    val expected = mutable.ListBuffer("Jack,4", "22,2", "John,4", "19,2", 
"Anna,4", "44,2")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -381,7 +381,7 @@ class CorrelateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 
   private def testData(env: StreamExecutionEnvironment): DataStream[(Int, 
Long, String)] = {
 
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Jack#22"))
     data.+=((2, 2L, "John#19"))
     data.+=((3, 2L, "Anna#44"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverAggregateITCase.scala
index be55b4add39..afe33ef4321 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverAggregateITCase.scala
@@ -202,7 +202,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,1,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
       "1,2,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
       "1,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
@@ -502,7 +502,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,0,0,1",
       "2,1,1,1",
       "2,3,1,2",
@@ -563,7 +563,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "1,0,0,1",
       "2,1,1,1",
       "2,3,1,2",
@@ -627,7 +627,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Hello,1,1,1,1",
       "Hello,1,2,2,1",
       "Hello,1,3,3,1",
@@ -703,7 +703,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Hello,1,1,1,1",
       "Hello,15,2,2,1",
       "Hello,16,3,3,1",
@@ -758,7 +758,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTest
     windowedTable.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = mutable.ListBuffer(
       "Hello World,20,2,2",
       "Hello World,7,1,1",
       "Hello,1,1,1",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/SetOperatorsITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/SetOperatorsITCase.scala
index a649658b1f5..aa8625636e6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/SetOperatorsITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/SetOperatorsITCase.scala
@@ -51,7 +51,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     unionDs.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Hi", "Hello", "Hello world", "Hi", 
"Hello", "Hello world")
+    val expected = mutable.ListBuffer("Hi", "Hello", "Hello world", "Hi", 
"Hello", "Hello world")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -66,7 +66,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     unionDs.toDataStream.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("Hi", "Hallo")
+    val expected = mutable.ListBuffer("Hi", "Hallo")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -97,7 +97,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
       .addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}")
+    val expected = mutable.ListBuffer("1,{}", "2,{}", "3,{}", "4,{}")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
@@ -118,7 +118,7 @@ class SetOperatorsITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
       .addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,1,a", "2,2,b", "3,3,c", "4,4,d")
+    val expected = mutable.ListBuffer("1,1,a", "2,2,b", "3,3,c", "4,4,d")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
index 9dcd5af7d58..1b36cc8ab2b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
@@ -32,7 +32,7 @@ import scala.util.Random
  */
 object CollectionBatchExecTable {
   def get3TupleDataSet(env: TableEnvironment, fields: String = null): Table = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((3, 2L, "Hello world"))
@@ -58,7 +58,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmall3TupleDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((3, 2L, "Hello world"))
@@ -66,7 +66,7 @@ object CollectionBatchExecTable {
   }
 
   def get5TupleDataSet(env: TableEnvironment, fields: String = null): Table = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    val data = new mutable.ListBuffer[(Int, Long, Int, String, Long)]
     data.+=((1, 1L, 0, "Hallo", 1L))
     data.+=((2, 2L, 1, "Hallo Welt", 2L))
     data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
@@ -86,7 +86,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmall5TupleDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    val data = new mutable.ListBuffer[(Int, Long, Int, String, Long)]
     data.+=((1, 1L, 0, "Hallo", 1L))
     data.+=((2, 2L, 1, "Hallo Welt", 2L))
     data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
@@ -94,7 +94,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmallNestedTupleDataSet(env: TableEnvironment, fields: String = 
null): Table = {
-    val data = new mutable.MutableList[((Int, Int), String)]
+    val data = new mutable.ListBuffer[((Int, Int), String)]
     data.+=(((1, 1), "one"))
     data.+=(((2, 2), "two"))
     data.+=(((3, 3), "three"))
@@ -102,7 +102,7 @@ object CollectionBatchExecTable {
   }
 
   def getGroupSortedNestedTupleDataSet(env: TableEnvironment, fields: String = 
null): Table = {
-    val data = new mutable.MutableList[((Int, Int), String)]
+    val data = new mutable.ListBuffer[((Int, Int), String)]
     data.+=(((1, 3), "a"))
     data.+=(((1, 2), "a"))
     data.+=(((2, 1), "a"))
@@ -114,7 +114,7 @@ object CollectionBatchExecTable {
   }
 
   def getStringDataSet(env: TableEnvironment, fields: String = null): Table = {
-    val data = new mutable.MutableList[String]
+    val data = new mutable.ListBuffer[String]
     data.+=("Hi")
     data.+=("Hello")
     data.+=("Hello world")
@@ -127,7 +127,7 @@ object CollectionBatchExecTable {
   }
 
   def getIntDataSet(env: TableEnvironment, fields: String = null): Table = {
-    val data = new mutable.MutableList[Int]
+    val data = new mutable.ListBuffer[Int]
     data.+=(1)
     data.+=(2)
     data.+=(2)
@@ -147,7 +147,7 @@ object CollectionBatchExecTable {
   }
 
   def getCustomTypeDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[CustomType]
+    val data = new mutable.ListBuffer[CustomType]
     data.+=(new CustomType(1, 0L, "Hi"))
     data.+=(new CustomType(2, 1L, "Hello"))
     data.+=(new CustomType(2, 2L, "Hello world"))
@@ -173,7 +173,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmallCustomTypeDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[CustomType]
+    val data = new mutable.ListBuffer[CustomType]
     data.+=(new CustomType(1, 0L, "Hi"))
     data.+=(new CustomType(2, 1L, "Hello"))
     data.+=(new CustomType(2, 2L, "Hello world"))
@@ -181,7 +181,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmallTuplebasedPojoMatchingDataSet(env: TableEnvironment, fields: 
String = null): Table = {
-    val data = new mutable.MutableList[(Int, String, Int, Int, Long, String, 
Long)]
+    val data = new mutable.ListBuffer[(Int, String, Int, Int, Long, String, 
Long)]
     data.+=((1, "First", 10, 100, 1000L, "One", 10000L))
     data.+=((2, "Second", 20, 200, 2000L, "Two", 20000L))
     data.+=((3, "Third", 30, 300, 3000L, "Three", 30000L))
@@ -189,7 +189,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmallPojoDataSet(env: TableEnvironment, fields: String = null): Table 
= {
-    val data = new mutable.MutableList[POJO]
+    val data = new mutable.ListBuffer[POJO]
     data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
     data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L))
     data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
@@ -197,7 +197,7 @@ object CollectionBatchExecTable {
   }
 
   def getDuplicatePojoDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[POJO]
+    val data = new mutable.ListBuffer[POJO]
     data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
     data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
     data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
@@ -210,7 +210,7 @@ object CollectionBatchExecTable {
   }
 
   def getCrazyNestedDataSet(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[CrazyNested]
+    val data = new mutable.ListBuffer[CrazyNested]
     data.+=(new CrazyNested("aa"))
     data.+=(new CrazyNested("bb"))
     data.+=(new CrazyNested("bb"))
@@ -221,7 +221,7 @@ object CollectionBatchExecTable {
   }
 
   def getTupleContainingPojos(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new mutable.MutableList[(Int, CrazyNested, POJO)]
+    val data = new mutable.ListBuffer[(Int, CrazyNested, POJO)]
     data.+=(
       (1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L)))
     data.+=(
@@ -234,7 +234,7 @@ object CollectionBatchExecTable {
   }
 
   def getMixedPojoDataSet(env: TableEnvironment, fields: String = null): Table 
= {
-    val data = new mutable.MutableList[POJO]
+    val data = new mutable.ListBuffer[POJO]
     data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10100L))
     data.+=(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L))
     data.+=(new POJO(3, "First", 11, 102, 3000L, "One", 10200L))
@@ -247,7 +247,7 @@ object CollectionBatchExecTable {
   }
 
   def getSmallTuplebasedDataSetMatchingPojo(env: TableEnvironment, fields: 
String = null): Table = {
-    val data = new mutable.MutableList[(Long, Integer, Integer, Long, String, 
Integer, String)]
+    val data = new mutable.ListBuffer[(Long, Integer, Integer, Long, String, 
Integer, String)]
     data.+=((10000L, 10, 100, 1000L, "One", 1, "First"))
     data.+=((20000L, 20, 200, 2000L, "Two", 2, "Second"))
     data.+=((30000L, 30, 300, 3000L, "Three", 3, "Third"))
@@ -255,7 +255,7 @@ object CollectionBatchExecTable {
   }
 
   def getPojoWithMultiplePojos(env: TableEnvironment, fields: String = null): 
Table = {
-    val data = new 
mutable.MutableList[CollectionBatchExecTable.PojoWithMultiplePojos]
+    val data = new 
mutable.ListBuffer[CollectionBatchExecTable.PojoWithMultiplePojos]
     data.+=(new CollectionBatchExecTable.PojoWithMultiplePojos("a", "aa", "b", 
"bb", 1))
     data.+=(new CollectionBatchExecTable.PojoWithMultiplePojos("b", "bb", "c", 
"cc", 2))
     data.+=(new CollectionBatchExecTable.PojoWithMultiplePojos("b", "bb", "c", 
"cc", 2))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
index abe98309354..13218c6456d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
@@ -87,7 +87,7 @@ class StreamingWithStateTestBase(state: StateBackendMode) 
extends StreamingTestB
   /** Creates a BinaryRowData DataStream from the given non-empty [[Seq]]. */
   def failingBinaryRowSource[T: TypeInformation](data: Seq[T]): 
DataStream[RowData] = {
     val typeInfo = 
implicitly[TypeInformation[_]].asInstanceOf[CompositeType[_]]
-    val result = new mutable.MutableList[RowData]
+    val result = new mutable.ListBuffer[RowData]
     val reuse = new BinaryRowData(typeInfo.getArity)
     val writer = new BinaryRowWriter(reuse)
     data.foreach {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index 1b9e8ba2b7e..08eeed489c3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -159,7 +159,7 @@ object TestData {
   val nullablesOfNullData5 = Array(true, true, true, true, true)
 
   lazy val smallTupleData3: Seq[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((3, 2L, "Hello world"))
@@ -171,7 +171,7 @@ object TestData {
   val nullablesOfSmallData3 = Array(true, true, true)
 
   lazy val smallTupleData5: Seq[(Int, Long, Int, String, Long)] = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    val data = new mutable.ListBuffer[(Int, Long, Int, String, Long)]
     data.+=((1, 1L, 0, "Hallo", 1L))
     data.+=((2, 2L, 1, "Hallo Welt", 2L))
     data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
@@ -237,7 +237,7 @@ object TestData {
   )
 
   lazy val tupleData2: Seq[(Int, Double)] = {
-    val data = new mutable.MutableList[(Int, Double)]
+    val data = new mutable.ListBuffer[(Int, Double)]
     data.+=((1, 0.1))
     data.+=((2, 0.2))
     data.+=((2, 0.2))
@@ -256,7 +256,7 @@ object TestData {
   }
 
   lazy val tupleData3: Seq[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
+    val data = new mutable.ListBuffer[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))
     data.+=((2, 2L, "Hello"))
     data.+=((3, 2L, "Hello world"))
@@ -331,7 +331,7 @@ object TestData {
   )
 
   lazy val smallNestedTupleData: Seq[((Int, Int), String)] = {
-    val data = new mutable.MutableList[((Int, Int), String)]
+    val data = new mutable.ListBuffer[((Int, Int), String)]
     data.+=(((1, 1), "one"))
     data.+=(((2, 2), "two"))
     data.+=(((3, 3), "three"))
@@ -388,7 +388,7 @@ object TestData {
   }
 
   lazy val tupleData5: Seq[(Int, Long, Int, String, Long)] = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    val data = new mutable.ListBuffer[(Int, Long, Int, String, Long)]
     data.+=((1, 1L, 0, "Hallo", 1L))
     data.+=((2, 2L, 1, "Hallo Welt", 2L))
     data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
@@ -1408,7 +1408,7 @@ object TestData {
       LocalDateTime.of(2020, 5, 1, 23, 23, 23, 0),
       null
     )
-    val instants = new mutable.MutableList[Instant]
+    val instants = new mutable.ListBuffer[Instant]
     for (i <- datetimes.indices) {
       if (datetimes(i) == null) {
         instants += null
@@ -1427,7 +1427,7 @@ object TestData {
     val maps =
       List(map(("k1", 1)), map(("k2", 2), ("k4", 4)), map(("k3", null)), 
map((null, 3)), null)
 
-    val data = new mutable.MutableList[Row]
+    val data = new mutable.ListBuffer[Row]
     for (i <- ints.indices) {
       data += row(
         bools(i),

Reply via email to