This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a9f8a0b4481719fb511436a61e36cb0e26559c79 Author: Kurt Young <k...@apache.org> AuthorDate: Thu Apr 23 22:20:05 2020 +0800 [FLINK-17339][misc] Update tests due to default planner changing. --- flink-connectors/flink-connector-cassandra/pom.xml | 2 +- .../java/org/apache/flink/sql/tests/StreamSQLTestProgram.java | 2 +- .../src/main/java/org/apache/flink/ml/common/MLEnvironment.java | 5 ++++- .../test/java/org/apache/flink/ml/common/MLEnvironmentTest.java | 4 +++- flink-python/pyflink/table/tests/test_environment_settings.py | 4 ++-- flink-python/pyflink/table/tests/test_table_environment_api.py | 6 +++--- flink-python/pyflink/testing/test_case_utils.py | 8 ++++++-- .../src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala | 5 +++-- 8 files changed, 23 insertions(+), 13 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index f66d5ec..b7aa6d1 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -172,7 +172,7 @@ under the License. <!-- A planner dependency won't be necessary once FLIP-32 has been completed. --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> <optional>true</optional> diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java index 40a4005..8d08466 100644 --- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java @@ -84,7 +84,7 @@ public class StreamSQLTestProgram { ParameterTool params = ParameterTool.fromArgs(args); String outputPath = params.getRequired("outputPath"); - String planner = params.get("planner", "old"); + String planner = params.get("planner", "blink"); final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance(); builder.inStreamingMode(); diff --git a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java index f9decea..595ac2c 100644 --- a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java +++ b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.ml.common; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; @@ -150,7 +151,9 @@ public class MLEnvironment { */ public StreamTableEnvironment getStreamTableEnvironment() { if (null == streamTableEnv) { - streamTableEnv = StreamTableEnvironment.create(getStreamExecutionEnvironment()); + streamTableEnv = StreamTableEnvironment.create( + getStreamExecutionEnvironment(), + EnvironmentSettings.newInstance().useOldPlanner().build()); } return streamTableEnv; } diff --git a/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java index 60b18fe..50f87c5 100644 --- a/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java +++ b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java @@ -21,6 +21,7 @@ package org.apache.flink.ml.common; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; @@ -56,7 +57,8 @@ public class MLEnvironmentTest { @Test public void testConstructWithStreamEnv() { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment); + StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create( + streamExecutionEnvironment, EnvironmentSettings.newInstance().useOldPlanner().build()); MLEnvironment mlEnvironment = new MLEnvironment(streamExecutionEnvironment, streamTableEnvironment); diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py b/flink-python/pyflink/table/tests/test_environment_settings.py index e493107..24c33a4 100644 --- a/flink-python/pyflink/table/tests/test_environment_settings.py +++ b/flink-python/pyflink/table/tests/test_environment_settings.py @@ -41,11 +41,11 @@ class EnvironmentSettingsTests(PyFlinkTestCase): self.assertEqual( envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME], - OLD_PLANNER_FACTORY) + BLINK_PLANNER_FACTORY) self.assertEqual( envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME], - OLD_EXECUTOR_FACTORY) + BLINK_EXECUTOR_FACTORY) # test use_old_planner envrionment_settings = builder.use_old_planner().build() diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index f03a0cc..308286c 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -275,16 +275,16 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa self.assertEqual( planner.getClass().getName(), - "org.apache.flink.table.planner.StreamPlanner") + "org.apache.flink.table.planner.delegation.StreamPlanner") t_env = StreamTableEnvironment.create( - environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) + environment_settings=EnvironmentSettings.new_instance().use_old_planner().build()) planner = t_env._j_tenv.getPlanner() self.assertEqual( planner.getClass().getName(), - "org.apache.flink.table.planner.delegation.StreamPlanner") + "org.apache.flink.table.planner.StreamPlanner") def test_table_environment_with_blink_planner(self): self.env.set_parallelism(1) diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py index 2ada340..35e889a 100644 --- a/flink-python/pyflink/testing/test_case_utils.py +++ b/flink-python/pyflink/testing/test_case_utils.py @@ -28,6 +28,7 @@ from abc import abstractmethod from py4j.java_gateway import JavaObject from py4j.protocol import Py4JJavaError +from pyflink.table import TableConfig from pyflink.table.sources import CsvTableSource from pyflink.dataset.execution_environment import ExecutionEnvironment from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment @@ -123,7 +124,10 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase): super(PyFlinkStreamTableTestCase, self).setUp() self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) - self.t_env = StreamTableEnvironment.create(self.env) + self.t_env = StreamTableEnvironment.create( + self.env, + environment_settings=EnvironmentSettings.new_instance() + .in_streaming_mode().use_old_planner().build()) class PyFlinkBatchTableTestCase(PyFlinkTestCase): @@ -135,7 +139,7 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase): super(PyFlinkBatchTableTestCase, self).setUp() self.env = ExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) - self.t_env = BatchTableEnvironment.create(self.env) + self.t_env = BatchTableEnvironment.create(self.env, TableConfig()) def collect(self, table): j_table = table._j_table diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index ee69cb1..6368589 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -19,10 +19,10 @@ package org.apache.flink.api.scala import java.io.{BufferedReader, File, FileOutputStream} - import org.apache.flink.api.java.{JarHelper, ScalaShellEnvironment, ScalaShellStreamEnvironment} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.util.AbstractID @@ -90,7 +90,8 @@ class FlinkILoop( val scalaBenv = new ExecutionEnvironment(remoteBenv) val scalaSenv = new StreamExecutionEnvironment(remoteSenv) val scalaBTEnv = BatchTableEnvironment.create(scalaBenv) - val scalaSTEnv = StreamTableEnvironment.create(scalaSenv) + val scalaSTEnv = StreamTableEnvironment.create( + scalaSenv, EnvironmentSettings.newInstance().useOldPlanner().build()) (scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv) }