This is an automated email from the ASF dual-hosted git repository.

felixcheung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 13fd50e  [ZEPPELIN-3914] upgrade Flink to 1.7.1 (#3266)
13fd50e is described below

commit 13fd50ee535cd28b237cb4af740d0f893dbff3ae
Author: Xue Yu <278006...@qq.com>
AuthorDate: Tue Jan 1 09:22:42 2019 +0800

    [ZEPPELIN-3914] upgrade Flink to 1.7.1 (#3266)
    
    ### What is this PR for?
    This PR is for upgrading current Flink to 1.7.1
    
    ### What type of PR is it?
    [Improvement]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3914
    
    ### How should this be tested?
    *manual
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?
    no
    * Is there breaking changes for older versions?
    no
    * Does this needs documentation?
    no
---
 docs/interpreter/flink.md                                   |  2 +-
 flink/pom.xml                                               |  2 +-
 .../apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala    |  2 +-
 .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala   | 13 ++++---------
 4 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 2cf3125..d3f2223 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -50,7 +50,7 @@ At the "Interpreters" menu, you have to create a new Flink 
interpreter and provi
   </tr>
 </table>
 
-For more information about Flink configuration, you can find it 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html).
+For more information about Flink configuration, you can find it 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html).
 
 ## How to test it's working
 You can find an example of Flink usage in the Zeppelin Tutorial folder or try 
the following word count example, by using the [Zeppelin 
notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u)
 from Till Rohrmann's presentation [Interactive data analysis with Apache 
Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for 
Apache Flink Meetup.
diff --git a/flink/pom.xml b/flink/pom.xml
index 7a374f2..331e19c 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -36,7 +36,7 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>1.5.2</flink.version>
+    <flink.version>1.7.1</flink.version>
     <flink.akka.version>2.3.7</flink.akka.version>
     <scala.macros.version>2.0.1</scala.macros.version>
     <scala.binary.version>2.11</scala.binary.version>
diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
index 1694a44..b2d8d16 100644
--- 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
+++ 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
@@ -30,7 +30,7 @@ class FlinkSQLScalaInterpreter(scalaInterpreter: 
FlinkScalaInterpreter,
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult 
= {
     try {
-      val table: Table = this.btenv.sql(code)
+      val table: Table = this.btenv.sqlQuery(code)
       val result = z.showData(table)
       return new InterpreterResult(InterpreterResult.Code.SUCCESS, result)
     } catch {
diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 14f8959..1d8b27e 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.scala.FlinkShell._
 import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.runtime.minicluster.{MiniCluster, 
StandaloneMiniCluster}
+import org.apache.flink.runtime.minicluster.MiniCluster
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
@@ -45,8 +45,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
   private var flinkILoop: FlinkILoop = _
-  private var cluster: Option[Either[Either[StandaloneMiniCluster, 
MiniCluster],
-    ClusterClient[_]]] = _
+  private var cluster: Option[Either[MiniCluster, ClusterClient[_]]] = _
   private var scalaCompleter: ScalaCompleter = _
   private val interpreterOutput = new InterpreterOutputStream(LOGGER)
 
@@ -68,8 +67,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val (iLoop, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(miniCluster)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -213,10 +211,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
     if (cluster != null) {
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) =>
-          LOGGER.info("Shutdown LegacyMiniCluster")
-          legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) =>
+        case Some(Left(newMiniCluster)) =>
           LOGGER.info("Shutdown NewMiniCluster")
           newMiniCluster.close()
         case Some(Right(yarnCluster)) =>

Reply via email to