This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3c86dcadf53 [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime 3c86dcadf53 is described below commit 3c86dcadf5366fa0026125051d69b0a8913d5e61 Author: Jacky Lau <liuyon...@gmail.com> AuthorDate: Mon Dec 18 19:41:03 2023 +0800 [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime This closes #23894. --- .../plan/rules/physical/stream/ChangelogModeInferenceTest.scala | 4 +++- .../table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala | 3 ++- .../flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala | 4 ++-- .../table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala | 4 +++- .../table/planner/plan/stream/table/TwoStageAggregateTest.scala | 2 +- .../flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 3 ++- .../flink/table/planner/runtime/stream/table/AggregateITCase.scala | 4 +++- .../table/planner/runtime/stream/table/GroupWindowITCase.scala | 5 +++-- .../runtime/stream/table/GroupWindowTableAggregateITCase.scala | 5 +++-- .../flink/table/planner/runtime/stream/table/JoinITCase.scala | 6 ++++-- .../table/planner/runtime/stream/table/TableAggregateITCase.scala | 4 +++- .../table/planner/runtime/utils/StreamingWithAggTestBase.scala | 3 ++- 12 files changed, 31 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala index a40028bd357..9b5c3e01544 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala @@ -26,6 +26,8 @@ import org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, TableTestBa import org.junit.jupiter.api.{BeforeEach, Test} +import java.time.Duration + /** Tests for [[FlinkChangelogModeInferenceProgram]]. */ class ChangelogModeInferenceTest extends TableTestBase { @@ -152,7 +154,7 @@ class ChangelogModeInferenceTest extends TableTestBase { @Test def testTwoLevelGroupByLocalGlobalOn(): Unit = { util.enableMiniBatch() - util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) util.tableEnv.getConfig.set( OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE.toString) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala index 9850e1afe9c..c8e4420634e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala @@ -28,6 +28,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedT import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith +import java.time.Duration import java.util @ExtendWith(Array(classOf[ParameterizedTestExtension])) @@ -41,7 +42,7 @@ class DistinctAggregateTest( @BeforeEach def before(): Unit = { - util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) util.enableMiniBatch() util.tableEnv.getConfig .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggPhaseEnforcer.toString) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala index 1a74d0b4158..810ab987867 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala @@ -438,7 +438,7 @@ class GroupWindowTest extends TableTestBase { def testWindowAggregateWithLateFire(): Unit = { util.tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true)) util.tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5)) - util.tableConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + util.tableConfig.setIdleStateRetention(Duration.ofHours(1)) val sql = """ |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt @@ -482,7 +482,7 @@ class GroupWindowTest extends TableTestBase { @Test def testWindowAggregateWithAllowLatenessOnly(): Unit = { - util.tableConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + util.tableConfig.setIdleStateRetention(Duration.ofHours(1)) val sql = """ |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala index 4832e8198ec..cc2d10c6edd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala @@ -25,6 +25,8 @@ import org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, TableTestBa import org.junit.jupiter.api.{BeforeEach, Test} +import java.time.Duration + class TwoStageAggregateTest extends TableTestBase { private val util = streamTestUtil() @@ -33,7 +35,7 @@ class TwoStageAggregateTest extends TableTestBase { @BeforeEach def before(): Unit = { util.enableMiniBatch() - util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) util.tableEnv.getConfig.set( OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE.toString) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala index a732907df6c..f9fcbd23c6f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala @@ -34,7 +34,7 @@ class TwoStageAggregateTest extends TableTestBase { def before(): Unit = { util = streamTestUtil() util.tableEnv.getConfig - .setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + .setIdleStateRetention(Duration.ofHours(1)) util.tableEnv.getConfig .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)) .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index c2783437f06..99d5e2a5f26 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -49,6 +49,7 @@ import org.junit.jupiter.api.extension.ExtendWith import java.lang.{Integer => JInt, Long => JLong} import java.math.{BigDecimal => JBigDecimal} +import java.time.Duration import scala.collection.{mutable, Seq} import scala.math.BigDecimal.double2bigDecimal @@ -1589,7 +1590,7 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State def testGenericTypesWithoutStateClean(): Unit = { // because we don't provide a way to disable state cleanup. // TODO verify all tests with state cleanup closed. - tEnv.getConfig.setIdleStateRetentionTime(Time.days(0), Time.days(0)) + tEnv.getConfig.setIdleStateRetention(Duration.ofDays(0)) val t = failingDataSource(Seq(1, 2, 3)).toTable(tEnv, 'a) val results = t .select(new GenericAggregateFunction()('a)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala index ecfedd1fd35..7de4c64c02a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala @@ -36,6 +36,8 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith +import java.time.Duration + import scala.collection.mutable /** Tests of groupby (without window) aggregations */ @@ -45,7 +47,7 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase @BeforeEach override def before(): Unit = { super.before() - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala index 921aeedca0a..42989af2ba8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala @@ -34,6 +34,7 @@ import org.junit.jupiter.api.TestTemplate import org.junit.jupiter.api.extension.ExtendWith import java.math.BigDecimal +import java.time.Duration /** * We only test some aggregations until better testing of constructed DataStream programs is @@ -62,7 +63,7 @@ class GroupWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBa @TestTemplate def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) val stream = failingDataSource(data) val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) @@ -135,7 +136,7 @@ class GroupWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBa @TestTemplate def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) val stream = failingDataSource(data) val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) val countFun = new CountAggFunction diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala index 0603fd3ebca..08b4837257a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowTableAggregateITCase.scala @@ -34,6 +34,7 @@ import org.junit.jupiter.api.TestTemplate import org.junit.jupiter.api.extension.ExtendWith import java.math.BigDecimal +import java.time.Duration @ExtendWith(Array(classOf[ParameterizedTestExtension])) class GroupWindowTableAggregateITCase(mode: StateBackendMode) @@ -59,7 +60,7 @@ class GroupWindowTableAggregateITCase(mode: StateBackendMode) @TestTemplate def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) val stream = failingDataSource(tupleData3) val table = stream.toTable(tEnv, 'int, 'long, 'string, 'proctime.proctime) @@ -126,7 +127,7 @@ class GroupWindowTableAggregateITCase(mode: StateBackendMode) @TestTemplate def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) val stream = failingDataSource(tupleData3) val table = stream.toTable(tEnv, 'int, 'long, 'string, 'proctime.proctime) val top3 = new Top3 diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala index fdcc1578bbd..6aa4635971d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala @@ -36,6 +36,8 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, Disabled, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith +import java.time.Duration + @ExtendWith(Array(classOf[ParameterizedTestExtension])) class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { @@ -81,7 +83,7 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode @BeforeEach override def before(): Unit = { super.before() - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) } @TestTemplate @@ -1374,7 +1376,7 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode .groupBy('bb) .select('bb, 'c.count.as('c)) - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) val t = leftTableWithPk .join(rightTableWithPk, 'b === 'bb) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala index 0adaf24dcb2..18d11d732f4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableAggregateITCase.scala @@ -33,6 +33,8 @@ import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith +import java.time.Duration + /** Tests of groupby (without window) table aggregations */ @ExtendWith(Array(classOf[ParameterizedTestExtension])) class TableAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { @@ -40,7 +42,7 @@ class TableAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTes @BeforeEach override def before(): Unit = { super.before() - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala index c21860cfbaf..56aed40dd09 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala @@ -27,6 +27,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameters import org.junit.jupiter.api.BeforeEach +import java.time.Duration import java.util import scala.collection.JavaConversions._ @@ -41,7 +42,7 @@ class StreamingWithAggTestBase( override def before(): Unit = { super.before() // in order to cover more code paths - tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) if (aggMode.isLocalAggEnabled) { tEnv.getConfig.set( OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,