This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1bfde8119d242c0954d7e5b6eea74a0dd43e4c7c Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Sat Jan 28 14:16:54 2023 +0800 [hotfix][runtime] remove ExecutionConfig#setScheduler --- .../org/apache/flink/api/common/ExecutionConfig.java | 16 +++------------- .../planner/plan/batch/sql/ForwardHashExchangeTest.java | 2 -- .../planner/runtime/batch/ParallelismSettingTest.java | 1 - .../runtime/batch/sql/ForwardHashExchangeITCase.java | 2 -- 4 files changed, 3 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 06473aa9308..f5e027a1939 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -21,7 +21,6 @@ package org.apache.flink.api.common; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -469,17 +468,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } } - /** - * TODO: this shouldn't exist and shouldn't pollute public API. Tests should change this via - * Configuration - */ - @VisibleForTesting - @Internal - public ExecutionConfig setScheduler(SchedulerType schedulerType) { - configuration.set(JobManagerOptions.SCHEDULER, schedulerType); - return this; - } - @Internal public boolean isDynamicGraph() { return configuration.get(JobManagerOptions.SCHEDULER) == SchedulerType.AdaptiveBatch; @@ -1162,7 +1150,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered.")) .ifPresent(c -> this.registeredKryoTypes = c); - configuration.getOptional(JobManagerOptions.SCHEDULER).ifPresent(this::setScheduler); + configuration + .getOptional(JobManagerOptions.SCHEDULER) + .ifPresent(t -> this.configuration.set(JobManagerOptions.SCHEDULER, t)); } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java index c9d3e42edae..33fe021ac3e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java @@ -17,7 +17,6 @@ package org.apache.flink.table.planner.plan.batch.sql; -import org.apache.flink.configuration.JobManagerOptions.SchedulerType; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; @@ -36,7 +35,6 @@ public class ForwardHashExchangeTest extends TableTestBase { public void before() { util = batchTestUtil(TableConfig.getDefault()); - util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch); util.tableEnv() .getConfig() .getConfiguration() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java index 590c8963b4f..1da85aac147 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.runtime.batch; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.JobManagerOptions.SchedulerType; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.operations.ModifyOperation; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java index f98ffce0af5..8ba1022f433 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.runtime.batch.sql; import org.apache.flink.api.common.BatchShuffleMode; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.JobManagerOptions.SchedulerType; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.planner.runtime.utils.BatchTestBase; @@ -38,7 +37,6 @@ public class ForwardHashExchangeITCase extends BatchTestBase { @Before public void before() throws Exception { super.before(); - env().getConfig().setScheduler(SchedulerType.AdaptiveBatch); env().disableOperatorChaining(); tEnv().getConfig() .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);