This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c86dcadf53 [FLINK-33778][table] Cleanup usage of deprecated 
TableConfig#setIdleStateRetentionTime
3c86dcadf53 is described below

commit 3c86dcadf5366fa0026125051d69b0a8913d5e61
Author: Jacky Lau <liuyon...@gmail.com>
AuthorDate: Mon Dec 18 19:41:03 2023 +0800

    [FLINK-33778][table] Cleanup usage of deprecated 
TableConfig#setIdleStateRetentionTime
    
    This closes #23894.
---
 .../plan/rules/physical/stream/ChangelogModeInferenceTest.scala     | 4 +++-
 .../table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala   | 3 ++-
 .../flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala   | 4 ++--
 .../table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala   | 4 +++-
 .../table/planner/plan/stream/table/TwoStageAggregateTest.scala     | 2 +-
 .../flink/table/planner/runtime/stream/sql/AggregateITCase.scala    | 3 ++-
 .../flink/table/planner/runtime/stream/table/AggregateITCase.scala  | 4 +++-
 .../table/planner/runtime/stream/table/GroupWindowITCase.scala      | 5 +++--
 .../runtime/stream/table/GroupWindowTableAggregateITCase.scala      | 5 +++--
 .../flink/table/planner/runtime/stream/table/JoinITCase.scala       | 6 ++++--
 .../table/planner/runtime/stream/table/TableAggregateITCase.scala   | 4 +++-
 .../table/planner/runtime/utils/StreamingWithAggTestBase.scala      | 3 ++-
 12 files changed, 31 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index a40028bd357..9b5c3e01544 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -26,6 +26,8 @@ import 
org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, TableTestBa
 
 import org.junit.jupiter.api.{BeforeEach, Test}
 
+import java.time.Duration
+
 /** Tests for [[FlinkChangelogModeInferenceProgram]]. */
 class ChangelogModeInferenceTest extends TableTestBase {
 
@@ -152,7 +154,7 @@ class ChangelogModeInferenceTest extends TableTestBase {
   @Test
   def testTwoLevelGroupByLocalGlobalOn(): Unit = {
     util.enableMiniBatch()
-    util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), 
Time.hours(2))
+    util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       AggregatePhaseStrategy.TWO_PHASE.toString)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 9850e1afe9c..c8e4420634e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -28,6 +28,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedT
 import org.junit.jupiter.api.{BeforeEach, TestTemplate}
 import org.junit.jupiter.api.extension.ExtendWith
 
+import java.time.Duration
 import java.util
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
@@ -41,7 +42,7 @@ class DistinctAggregateTest(
 
   @BeforeEach
   def before(): Unit = {
-    util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), 
Time.hours(2))
+    util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     util.enableMiniBatch()
     util.tableEnv.getConfig
       .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
aggPhaseEnforcer.toString)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
index 1a74d0b4158..810ab987867 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
@@ -438,7 +438,7 @@ class GroupWindowTest extends TableTestBase {
   def testWindowAggregateWithLateFire(): Unit = {
     util.tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
     util.tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, 
Duration.ofSeconds(5))
-    util.tableConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    util.tableConfig.setIdleStateRetention(Duration.ofHours(1))
     val sql =
       """
         |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
@@ -482,7 +482,7 @@ class GroupWindowTest extends TableTestBase {
 
   @Test
   def testWindowAggregateWithAllowLatenessOnly(): Unit = {
-    util.tableConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    util.tableConfig.setIdleStateRetention(Duration.ofHours(1))
     val sql =
       """
         |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
index 4832e8198ec..cc2d10c6edd 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
@@ -25,6 +25,8 @@ import 
org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, TableTestBa
 
 import org.junit.jupiter.api.{BeforeEach, Test}
 
+import java.time.Duration
+
 class TwoStageAggregateTest extends TableTestBase {
 
   private val util = streamTestUtil()
@@ -33,7 +35,7 @@ class TwoStageAggregateTest extends TableTestBase {
   @BeforeEach
   def before(): Unit = {
     util.enableMiniBatch()
-    util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), 
Time.hours(2))
+    util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       AggregatePhaseStrategy.TWO_PHASE.toString)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
index a732907df6c..f9fcbd23c6f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
@@ -34,7 +34,7 @@ class TwoStageAggregateTest extends TableTestBase {
   def before(): Unit = {
     util = streamTestUtil()
     util.tableEnv.getConfig
-      .setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+      .setIdleStateRetention(Duration.ofHours(1))
     util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1))
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
Boolean.box(true))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index c2783437f06..99d5e2a5f26 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
+import java.time.Duration
 
 import scala.collection.{mutable, Seq}
 import scala.math.BigDecimal.double2bigDecimal
@@ -1589,7 +1590,7 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
   def testGenericTypesWithoutStateClean(): Unit = {
     // because we don't provide a way to disable state cleanup.
     // TODO verify all tests with state cleanup closed.
-    tEnv.getConfig.setIdleStateRetentionTime(Time.days(0), Time.days(0))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofDays(0))
     val t = failingDataSource(Seq(1, 2, 3)).toTable(tEnv, 'a)
     val results = t
       .select(new GenericAggregateFunction()('a))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index ecfedd1fd35..7de4c64c02a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -36,6 +36,8 @@ import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{BeforeEach, TestTemplate}
 import org.junit.jupiter.api.extension.ExtendWith
 
+import java.time.Duration
+
 import scala.collection.mutable
 
 /** Tests of groupby (without window) aggregations */
@@ -45,7 +47,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
   @BeforeEach
   override def before(): Unit = {
     super.before()
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
   }
 
   @TestTemplate
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
index 921aeedca0a..42989af2ba8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.TestTemplate
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.math.BigDecimal
+import java.time.Duration
 
 /**
  * We only test some aggregations until better testing of constructed 
DataStream programs is
@@ -62,7 +63,7 @@ class GroupWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBa
 
   @TestTemplate
   def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     val stream = failingDataSource(data)
     val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
 
@@ -135,7 +136,7 @@ class GroupWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBa
 
   @TestTemplate
   def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     val stream = failingDataSource(data)
     val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
     val countFun = new CountAggFunction
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala
index 0603fd3ebca..08b4837257a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.TestTemplate
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.math.BigDecimal
+import java.time.Duration
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
 class GroupWindowTableAggregateITCase(mode: StateBackendMode)
@@ -59,7 +60,7 @@ class GroupWindowTableAggregateITCase(mode: StateBackendMode)
 
   @TestTemplate
   def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     val stream = failingDataSource(tupleData3)
     val table = stream.toTable(tEnv, 'int, 'long, 'string, 'proctime.proctime)
 
@@ -126,7 +127,7 @@ class GroupWindowTableAggregateITCase(mode: 
StateBackendMode)
 
   @TestTemplate
   def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     val stream = failingDataSource(tupleData3)
     val table = stream.toTable(tEnv, 'int, 'long, 'string, 'proctime.proctime)
     val top3 = new Top3
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index fdcc1578bbd..6aa4635971d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -36,6 +36,8 @@ import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{BeforeEach, Disabled, TestTemplate}
 import org.junit.jupiter.api.extension.ExtendWith
 
+import java.time.Duration
+
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
 class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
@@ -81,7 +83,7 @@ class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   @BeforeEach
   override def before(): Unit = {
     super.before()
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
   }
 
   @TestTemplate
@@ -1374,7 +1376,7 @@ class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
       .groupBy('bb)
       .select('bb, 'c.count.as('c))
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
 
     val t = leftTableWithPk
       .join(rightTableWithPk, 'b === 'bb)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
index 0adaf24dcb2..18d11d732f4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala
@@ -33,6 +33,8 @@ import org.assertj.core.api.Assertions.{assertThat, 
assertThatThrownBy}
 import org.junit.jupiter.api.{BeforeEach, TestTemplate}
 import org.junit.jupiter.api.extension.ExtendWith
 
+import java.time.Duration
+
 /** Tests of groupby (without window) table aggregations */
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
 class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
@@ -40,7 +42,7 @@ class TableAggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTes
   @BeforeEach
   override def before(): Unit = {
     super.before()
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
   }
 
   @TestTemplate
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
index c21860cfbaf..56aed40dd09 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
@@ -27,6 +27,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameters
 
 import org.junit.jupiter.api.BeforeEach
 
+import java.time.Duration
 import java.util
 
 import scala.collection.JavaConversions._
@@ -41,7 +42,7 @@ class StreamingWithAggTestBase(
   override def before(): Unit = {
     super.before()
     // in order to cover more code paths
-    tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1))
     if (aggMode.isLocalAggEnabled) {
       tEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,

Reply via email to