This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9f8a0b4481719fb511436a61e36cb0e26559c79
Author: Kurt Young <k...@apache.org>
AuthorDate: Thu Apr 23 22:20:05 2020 +0800

    [FLINK-17339][misc] Update tests due to default planner changing.
---
 flink-connectors/flink-connector-cassandra/pom.xml                | 2 +-
 .../java/org/apache/flink/sql/tests/StreamSQLTestProgram.java     | 2 +-
 .../src/main/java/org/apache/flink/ml/common/MLEnvironment.java   | 5 ++++-
 .../test/java/org/apache/flink/ml/common/MLEnvironmentTest.java   | 4 +++-
 flink-python/pyflink/table/tests/test_environment_settings.py     | 4 ++--
 flink-python/pyflink/table/tests/test_table_environment_api.py    | 6 +++---
 flink-python/pyflink/testing/test_case_utils.py                   | 8 ++++++--
 .../src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala    | 5 +++--
 8 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/pom.xml 
b/flink-connectors/flink-connector-cassandra/pom.xml
index f66d5ec..b7aa6d1 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -172,7 +172,7 @@ under the License.
                <!-- A planner dependency won't be necessary once FLIP-32 has 
been completed. -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                        <optional>true</optional>
diff --git 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index 40a4005..8d08466 100644
--- 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -84,7 +84,7 @@ public class StreamSQLTestProgram {
 
                ParameterTool params = ParameterTool.fromArgs(args);
                String outputPath = params.getRequired("outputPath");
-               String planner = params.get("planner", "old");
+               String planner = params.get("planner", "blink");
 
                final EnvironmentSettings.Builder builder = 
EnvironmentSettings.newInstance();
                builder.inStreamingMode();
diff --git 
a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 
b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
index f9decea..595ac2c 100644
--- 
a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
+++ 
b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.ml.common;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
@@ -150,7 +151,9 @@ public class MLEnvironment {
         */
        public StreamTableEnvironment getStreamTableEnvironment() {
                if (null == streamTableEnv) {
-                       streamTableEnv = 
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+                       streamTableEnv = StreamTableEnvironment.create(
+                                       getStreamExecutionEnvironment(),
+                                       
EnvironmentSettings.newInstance().useOldPlanner().build());
                }
                return streamTableEnv;
        }
diff --git 
a/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
 
b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
index 60b18fe..50f87c5 100644
--- 
a/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
+++ 
b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.ml.common;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
@@ -56,7 +57,8 @@ public class MLEnvironmentTest {
        @Test
        public void testConstructWithStreamEnv() {
                StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);
+               StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(
+                               streamExecutionEnvironment, 
EnvironmentSettings.newInstance().useOldPlanner().build());
 
                MLEnvironment mlEnvironment = new 
MLEnvironment(streamExecutionEnvironment, streamTableEnvironment);
 
diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py 
b/flink-python/pyflink/table/tests/test_environment_settings.py
index e493107..24c33a4 100644
--- a/flink-python/pyflink/table/tests/test_environment_settings.py
+++ b/flink-python/pyflink/table/tests/test_environment_settings.py
@@ -41,11 +41,11 @@ class EnvironmentSettingsTests(PyFlinkTestCase):
 
         self.assertEqual(
             
envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
-            OLD_PLANNER_FACTORY)
+            BLINK_PLANNER_FACTORY)
 
         self.assertEqual(
             
envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
-            OLD_EXECUTOR_FACTORY)
+            BLINK_EXECUTOR_FACTORY)
 
         # test use_old_planner
         envrionment_settings = builder.use_old_planner().build()
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index f03a0cc..308286c 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -275,16 +275,16 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, 
PyFlinkStreamTableTestCa
 
         self.assertEqual(
             planner.getClass().getName(),
-            "org.apache.flink.table.planner.StreamPlanner")
+            "org.apache.flink.table.planner.delegation.StreamPlanner")
 
         t_env = StreamTableEnvironment.create(
-            
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+            
environment_settings=EnvironmentSettings.new_instance().use_old_planner().build())
 
         planner = t_env._j_tenv.getPlanner()
 
         self.assertEqual(
             planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.StreamPlanner")
+            "org.apache.flink.table.planner.StreamPlanner")
 
     def test_table_environment_with_blink_planner(self):
         self.env.set_parallelism(1)
diff --git a/flink-python/pyflink/testing/test_case_utils.py 
b/flink-python/pyflink/testing/test_case_utils.py
index 2ada340..35e889a 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -28,6 +28,7 @@ from abc import abstractmethod
 from py4j.java_gateway import JavaObject
 from py4j.protocol import Py4JJavaError
 
+from pyflink.table import TableConfig
 from pyflink.table.sources import CsvTableSource
 from pyflink.dataset.execution_environment import ExecutionEnvironment
 from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment
@@ -123,7 +124,10 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase):
         super(PyFlinkStreamTableTestCase, self).setUp()
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
-        self.t_env = StreamTableEnvironment.create(self.env)
+        self.t_env = StreamTableEnvironment.create(
+            self.env,
+            environment_settings=EnvironmentSettings.new_instance()
+                .in_streaming_mode().use_old_planner().build())
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -135,7 +139,7 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase):
         super(PyFlinkBatchTableTestCase, self).setUp()
         self.env = ExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
-        self.t_env = BatchTableEnvironment.create(self.env)
+        self.t_env = BatchTableEnvironment.create(self.env, TableConfig())
 
     def collect(self, table):
         j_table = table._j_table
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index ee69cb1..6368589 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.api.scala
 
 import java.io.{BufferedReader, File, FileOutputStream}
-
 import org.apache.flink.api.java.{JarHelper, ScalaShellEnvironment, 
ScalaShellStreamEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.EnvironmentSettings
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.util.AbstractID
 
@@ -90,7 +90,8 @@ class FlinkILoop(
     val scalaBenv = new ExecutionEnvironment(remoteBenv)
     val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
     val scalaBTEnv = BatchTableEnvironment.create(scalaBenv)
-    val scalaSTEnv = StreamTableEnvironment.create(scalaSenv)
+    val scalaSTEnv = StreamTableEnvironment.create(
+      scalaSenv, EnvironmentSettings.newInstance().useOldPlanner().build())
     (scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
   }
 

Reply via email to