This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4767c5f  [ZEPPELIN-4656]. Improvement of FlinkInterpreter
4767c5f is described below

commit 4767c5f3640ab02f08f1928f1b74a9119e379354
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Sun Mar 1 13:56:02 2020 +0800

    [ZEPPELIN-4656]. Improvement of FlinkInterpreter
    
    ### What is this PR for?
    
    This is a followup PR of ZEPPELIN-4488. What is done in this PR
    1. Add missing properties in `interpreter-setting.json`
    2. Fix the concurrency issue of running multiple sql simultaneously.
    3. Add setting `flink.webui.yarn.useProxy` to allow use yarn proxy url.
    4. Delete staging dir in yarn mode after cluster is shutdown
    
    ### What type of PR is it?
    [Bug Fix | Improvement  | Documentation ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4656
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3676 from zjffdu/ZEPPELIN-4656 and squashes the following commits:
    
    06c3c0a85 [Jeff Zhang] [ZEPPELIN-4656]. Improvement of FlinkInterpreter
---
 docs/interpreter/flink.md                          | 15 ++++
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   | 21 +----
 .../apache/zeppelin/flink/FlinkInterpreter.java    | 15 +++-
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 90 +++++++++++++++-------
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  | 73 ++++++++----------
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  7 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |  3 +-
 flink/src/main/resources/interpreter-setting.json  | 25 +++++-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 65 ++++++++++++++--
 9 files changed, 216 insertions(+), 98 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index dfc3b57..ffef1e7 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -132,6 +132,11 @@ You can also set other flink properties which are not 
listed in the table. For a
     <td>queue name of yarn app</td>
   </tr>
   <tr>
+    <td>flink.webui.yarn.useProxy</td>
+    <td>false</td>
+    <td>whether use yarn proxy url as flink weburl, e.g. 
http://localhost:8088/proxy/application_1583396598068_0004</td>
+  </tr>
+  <tr>
     <td>flink.udf.jars</td>
     <td></td>
     <td>udf jars (comma separated), zeppelin will register udf in this jar 
automatically for user. The udf name is the class name.</td>
@@ -186,6 +191,16 @@ You can also set other flink properties which are not 
listed in the table. For a
     <td>1000</td>
     <td>max number of row returned by sql interpreter</td>
   </tr>
+  <tr>
+    <td>flink.interpreter.close.shutdown_cluster</td>
+    <td>true</td>
+    <td>Whether shutdown application when closing interpreter</td>
+  </tr>
+  <tr>
+    <td>zeppelin.interpreter.close.cancel_job</td>
+    <td>true</td>
+    <td>Whether cancel flink job when closing interpreter</td>
+  </tr>
 </table>
 
 
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index e3a12e4..ba5319c 100644
--- 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -55,23 +55,10 @@ public class FlinkBatchSqlInterpreter extends 
FlinkSqlInterrpeter {
 
   @Override
   public void callInnerSelect(String sql, InterpreterContext context) throws 
IOException {
-    int defaultSqlParallelism = this.tbenv.getConfig().getConfiguration()
-            
.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
-    try {
-      if (context.getLocalProperties().containsKey("parallelism")) {
-        this.tbenv.getConfig().getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                        
Integer.parseInt(context.getLocalProperties().get("parallelism")));
-      }
-      Table table = this.tbenv.sqlQuery(sql);
-      z.setCurrentSql(sql);
-      String result = z.showData(table);
-      context.out.write(result);
-    } finally {
-      this.tbenv.getConfig().getConfiguration()
-              
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                      defaultSqlParallelism);
-    }
+    Table table = this.tbenv.sqlQuery(sql);
+    z.setCurrentSql(sql);
+    String result = z.showData(table);
+    context.out.write(result);
   }
 
   @Override
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index e32df4f..b089b36 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -66,7 +66,16 @@ public class FlinkInterpreter extends Interpreter {
     this.z.setInterpreterContext(context);
     this.z.setGui(context.getGui());
     this.z.setNoteGui(context.getNoteGui());
-    return innerIntp.interpret(st, context);
+
+    // set ClassLoader of current Thread to be the ClassLoader of Flink 
scala-shell,
+    // otherwise codegen will fail to find classes defined in scala-shell
+    ClassLoader originClassLoader = 
Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader());
+      return innerIntp.interpret(st, context);
+    } finally {
+      Thread.currentThread().setContextClassLoader(originClassLoader);
+    }
   }
 
   @Override
@@ -124,6 +133,10 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp.getDefaultParallelism();
   }
 
+  int getDefaultSqlParallelism() {
+    return this.innerIntp.getDefaultSqlParallelism();
+  }
+
   public ClassLoader getFlinkScalaShellLoader() {
     return innerIntp.getFlinkScalaShellLoader();
   }
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 5ebc33f..d794b4b 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -18,6 +18,7 @@
 
 package org.apache.zeppelin.flink;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -26,6 +27,7 @@ import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -40,21 +42,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class FlinkSqlInterrpeter extends Interpreter {
 
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSqlInterrpeter.class);
 
-  protected static final String MESSAGE_HELP = new AttributedStringBuilder()
+  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
           .append("The following commands are available:\n\n")
           .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under 
current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE,
-                  "Drop table with optional catalog and database. Syntax: 
'DROP TABLE [IF EXISTS] <name>;'"))
+          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with 
optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
           .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual 
table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
           .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of 
a table with the given name."))
           .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously 
created virtual table. Syntax: 'DROP VIEW <name>;'"))
@@ -65,20 +71,22 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
           .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
           .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
user-defined and built-in functions."))
           .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered 
tables."))
+          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query 
from a file and executes it on the Flink cluster."))
           .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current 
catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
           .append(formatCommand(SqlCommand.USE, "Sets the current default 
database. Experimental! Syntax: 'USE <name>;'"))
           .style(AttributedStyle.DEFAULT.underline())
           .append("\nHint")
           .style(AttributedStyle.DEFAULT)
           .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
-          .toAttributedString()
-          .toString();
+          .toAttributedString();
 
   protected FlinkInterpreter flinkInterpreter;
   protected TableEnvironment tbenv;
   protected TableEnvironment tbenv_2;
   private SqlSplitter sqlSplitter;
-  private ReentrantLock lock = new ReentrantLock();
+  private int defaultSqlParallelism;
+  private ReentrantReadWriteLock.WriteLock lock = new 
ReentrantReadWriteLock().writeLock();
+
 
   public FlinkSqlInterrpeter(Properties properties) {
     super(properties);
@@ -94,8 +102,10 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     JobListener jobListener = new JobListener() {
       @Override
       public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable 
Throwable throwable) {
-        lock.unlock();
-        LOGGER.info("UnLock JobSubmitLock");
+        if (lock.isHeldByCurrentThread()) {
+          lock.unlock();
+          LOGGER.info("UnLock JobSubmitLock");
+        }
       }
 
       @Override
@@ -106,6 +116,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
     
flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
+    this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
   }
 
   @Override
@@ -134,7 +145,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       if (!sqlCommand.isPresent()) {
         try {
           context.out.write("%text Invalid Sql statement: " + sql + "\n");
-          context.out.write(MESSAGE_HELP);
+          context.out.write(MESSAGE_HELP.toString());
         } catch (IOException e) {
           return new InterpreterResult(InterpreterResult.Code.ERROR, 
e.toString());
         }
@@ -175,6 +186,9 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       case SHOW_TABLES:
         callShowTables(context);
         break;
+      case SOURCE:
+        callSource(cmdCall.operands[0], context);
+        break;
       case SHOW_FUNCTIONS:
         callShowFunctions(context);
         break;
@@ -231,10 +245,10 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   private void callAlterTable(String sql, InterpreterContext context) throws 
IOException {
     try {
-      lock.tryLock();
+      lock.lock();
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -243,10 +257,10 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   private void callAlterDatabase(String sql, InterpreterContext context) 
throws IOException {
     try {
-      lock.tryLock();
+      lock.lock();
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -257,7 +271,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     try {
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -268,7 +282,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     try {
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -285,7 +299,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       lock.lock();
       this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query));
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -294,10 +308,10 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   private void callCreateTable(String sql, InterpreterContext context) throws 
IOException {
     try {
-      lock.tryLock();
+      lock.lock();
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -306,10 +320,10 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   private void callDropTable(String sql, InterpreterContext context) throws 
IOException {
     try {
-      lock.tryLock();
+      lock.lock();
       this.tbenv.sqlUpdate(sql);
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -326,7 +340,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
   }
 
   private void callHelp(InterpreterContext context) throws IOException {
-    context.out.write(MESSAGE_HELP);
+    context.out.write(MESSAGE_HELP.toString());
   }
 
   private void callShowCatalogs(InterpreterContext context) throws IOException 
{
@@ -346,6 +360,11 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
             "%table table\n" + StringUtils.join(tables, "\n") + "\n");
   }
 
+  private void callSource(String sqlFile, InterpreterContext context) throws 
IOException {
+    String sql = IOUtils.toString(new FileInputStream(sqlFile));
+    runSqlList(sql, context);
+  }
+
   private void callShowFunctions(InterpreterContext context) throws 
IOException {
     String[] functions = this.tbenv.listUserDefinedFunctions();
     context.out.write(
@@ -369,11 +388,11 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   private void callExplain(String sql, InterpreterContext context) throws 
IOException {
     try {
-      lock.tryLock();
+      lock.lock();
       Table table = this.tbenv.sqlQuery(sql);
       context.out.write(this.tbenv.explain(table) + "\n");
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
     }
@@ -381,12 +400,21 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   public void callSelect(String sql, InterpreterContext context) throws 
IOException {
     try {
-      lock.tryLock();
+      lock.lock();
+      if (context.getLocalProperties().containsKey("parallelism")) {
+        this.tbenv.getConfig().getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                        
Integer.parseInt(context.getLocalProperties().get("parallelism")));
+      }
       callInnerSelect(sql, context);
+
     } finally {
-      if (lock.isLocked()) {
+      if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
+      this.tbenv.getConfig().getConfiguration()
+              
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                      defaultSqlParallelism);
     }
   }
 
@@ -398,15 +426,23 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
        context.getLocalProperties().put("flink.streaming.insert_into", "true");
      }
      try {
-       lock.tryLock();
+       lock.lock();
+       if (context.getLocalProperties().containsKey("parallelism")) {
+         this.tbenv.getConfig().getConfiguration()
+                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                         
Integer.parseInt(context.getLocalProperties().get("parallelism")));
+       }
        this.tbenv.sqlUpdate(sql);
        this.tbenv.execute(sql);
      } catch (Exception e) {
        throw new IOException(e);
      } finally {
-       if (lock.isLocked()) {
+       if (lock.isHeldByCurrentThread()) {
          lock.unlock();
        }
+       this.tbenv.getConfig().getConfiguration()
+               
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                       defaultSqlParallelism);
      }
      context.out.write("Insertion successfully.\n");
   }
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index 6ba9f98..6f7b326 100644
--- 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -69,50 +69,37 @@ public class FlinkStreamSqlInterpreter extends 
FlinkSqlInterrpeter {
                 .setString("execution.savepoint.path", 
savepointPath.toString());
       }
     }
-    int defaultSqlParallelism = this.tbenv.getConfig().getConfiguration()
-            
.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
-    try {
-      if (context.getLocalProperties().containsKey("parallelism")) {
-        this.tbenv.getConfig().getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                        
Integer.parseInt(context.getLocalProperties().get("parallelism")));
-      }
 
-      String streamType = context.getLocalProperties().get("type");
-      if (streamType == null) {
-        throw new IOException("type must be specified for stream sql");
-      }
-      if (streamType.equalsIgnoreCase("single")) {
-        SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
-                flinkInterpreter.getStreamExecutionEnvironment(),
-                tbenv,
-                flinkInterpreter.getJobManager(),
-                context,
-                flinkInterpreter.getDefaultParallelism());
-        streamJob.run(sql);
-      } else if (streamType.equalsIgnoreCase("append")) {
-        AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
-                flinkInterpreter.getStreamExecutionEnvironment(),
-                flinkInterpreter.getStreamTableEnvironment(),
-                flinkInterpreter.getJobManager(),
-                context,
-                flinkInterpreter.getDefaultParallelism());
-        streamJob.run(sql);
-      } else if (streamType.equalsIgnoreCase("update")) {
-        UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
-                flinkInterpreter.getStreamExecutionEnvironment(),
-                flinkInterpreter.getStreamTableEnvironment(),
-                flinkInterpreter.getJobManager(),
-                context,
-                flinkInterpreter.getDefaultParallelism());
-        streamJob.run(sql);
-      } else {
-        throw new IOException("Unrecognized stream type: " + streamType);
-      }
-    } finally {
-      this.tbenv.getConfig().getConfiguration()
-              
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                      defaultSqlParallelism);
+    String streamType = context.getLocalProperties().get("type");
+    if (streamType == null) {
+      throw new IOException("type must be specified for stream sql");
+    }
+    if (streamType.equalsIgnoreCase("single")) {
+      SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              tbenv,
+              flinkInterpreter.getJobManager(),
+              context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else if (streamType.equalsIgnoreCase("append")) {
+      AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              flinkInterpreter.getStreamTableEnvironment(),
+              flinkInterpreter.getJobManager(),
+              context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else if (streamType.equalsIgnoreCase("update")) {
+      UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              flinkInterpreter.getStreamTableEnvironment(),
+              flinkInterpreter.getJobManager(),
+              context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else {
+      throw new IOException("Unrecognized stream type: " + streamType);
     }
   }
 
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 8607d76..970f6cf 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -39,13 +39,17 @@ public class IPyFlinkInterpreter extends IPythonInterpreter 
{
 
   private FlinkInterpreter flinkInterpreter;
   private InterpreterContext curInterpreterContext;
+  private boolean opened = false;
 
   public IPyFlinkInterpreter(Properties property) {
     super(property);
   }
 
   @Override
-  public void open() throws InterpreterException {
+  public synchronized void open() throws InterpreterException {
+    if (opened) {
+      return;
+    }
     FlinkInterpreter pyFlinkInterpreter =
         getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class, 
false);
     setProperty("zeppelin.python",
@@ -53,6 +57,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     flinkInterpreter = 
getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
     setAdditionalPythonInitFile("python/zeppelin_ipyflink.py");
     super.open();
+    opened = true;
   }
 
   @Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java 
b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 3a59079..e148cf5 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -54,6 +54,7 @@ public class JobManager {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
     FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, 
jobClient.getJobID(), context);
+    thread.setName("JobProgressPoller-Thread-" + paragraphId);
     thread.start();
     this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
     if (previousJobClient != null) {
@@ -135,6 +136,7 @@ public class JobManager {
     }
 
     FlinkJobProgressPoller jobProgressPoller = 
jobProgressPollerMap.remove(jobClient.getJobID());
+    jobProgressPoller.cancel();
     jobProgressPoller.interrupt();
   }
 
@@ -156,7 +158,6 @@ public class JobManager {
 
     @Override
     public void run() {
-
       while (!Thread.currentThread().isInterrupted() && running.get()) {
         try {
           JsonNode rootNode = Unirest.get(flinkWebUI + "/jobs/" + 
jobId.toString())
diff --git a/flink/src/main/resources/interpreter-setting.json 
b/flink/src/main/resources/interpreter-setting.json
index 65aac9e..ea92e45 100644
--- a/flink/src/main/resources/interpreter-setting.json
+++ b/flink/src/main/resources/interpreter-setting.json
@@ -68,6 +68,13 @@
         "description": "yarn queue name",
         "type": "string"
       },
+      "flink.webui.yarn.useProxy": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": false,
+        "description": "whether use yarn proxy url as flink weburl, e.g. 
http://localhost:8088/proxy/application_1583396598068_0004";,
+        "type": "checkbox"
+      },
       "flink.udf.jars": {
         "envName": null,
         "propertyName": null,
@@ -103,6 +110,13 @@
         "description": "whether enable hive",
         "type": "checkbox"
       },
+      "zeppelin.flink.hive.version": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "2.3.4",
+        "description": "hive version that you would like to connect",
+        "type": "string"
+      },
       "zeppelin.flink.printREPLOutput": {
         "envName": null,
         "propertyName": "zeppelin.flink.printREPLOutput",
@@ -127,9 +141,16 @@
       "flink.interpreter.close.shutdown_cluster": {
         "envName": "flink.interpreter.close.shutdown_cluster",
         "propertyName": "flink.interpreter.close.shutdown_cluster",
-        "defaultValue": "true",
+        "defaultValue": true,
         "description": "Whether shutdown application when close interpreter",
-        "type": "string"
+        "type": "checkbox"
+      },
+      "zeppelin.interpreter.close.cancel_job": {
+        "envName": "zeppelin.interpreter.close.cancel_job",
+        "propertyName": "zeppelin.interpreter.close.cancel_job",
+        "defaultValue": true,
+        "description": "Whether cancel flink job when closing interpreter",
+        "type": "checkbox"
       }
     },
     "editor": {
diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 29bd699..02ffaf2 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.{BufferedReader, File}
+import java.io.{BufferedReader, File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Files
 import java.util.{Map, Properties}
@@ -34,7 +34,9 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.execution.{JobClient, JobListener}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, 
StreamExecutionEnvironment => JStreamExecutionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironmentFactory, 
ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
@@ -43,6 +45,11 @@ import org.apache.flink.table.catalog.hive.HiveCatalog
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableAggregateFunction, TableFunction}
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.module.hive.HiveModule
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.zeppelin.flink.util.DependencyUtils
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
@@ -95,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   private var jmWebUrl: String = _
   private var jobManager: JobManager = _
   private var defaultParallelism = 1;
+  private var defaultSqlParallelism = 1;
   private var userJars: Seq[String] = _
 
   def open(): Unit = {
@@ -149,7 +157,9 @@ class FlinkScalaInterpreter(val properties: Properties) {
     // load other configuration from interpreter properties
     properties.asScala.foreach(entry => configuration.setString(entry._1, 
entry._2))
     this.defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)
+    this.defaultSqlParallelism = 
configuration.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
     LOGGER.info("Default Parallelism: " + this.defaultParallelism)
+    LOGGER.info("Default SQL Parallelism: " + this.defaultSqlParallelism)
 
     // set scala.color
     if (properties.getProperty("zeppelin.flink.scala.color", 
"true").toBoolean) {
@@ -187,13 +197,27 @@ class FlinkScalaInterpreter(val properties: Properties) {
           // local mode or yarn
           if (mode == ExecutionMode.LOCAL) {
             LOGGER.info("Starting FlinkCluster in local mode")
+            this.jmWebUrl = clusterClient.getWebInterfaceURL
           } else if (mode == ExecutionMode.YARN) {
             LOGGER.info("Starting FlinkCluster in yarn mode")
+            if (properties.getProperty("flink.webui.yarn.useProxy", 
"false").toBoolean) {
+              val yarnAppId = 
clusterClient.getClusterId.asInstanceOf[ApplicationId]
+              val yarnClient = YarnClient.createYarnClient
+              val yarnConf = new YarnConfiguration()
+              // disable timeline service as we only query yarn app here.
+              // Otherwise we may hit this kind of ERROR:
+              // java.lang.ClassNotFoundException: 
com.sun.jersey.api.client.config.ClientConfig
+              yarnConf.set("yarn.timeline-service.enabled", "false")
+              yarnClient.init(yarnConf)
+              yarnClient.start()
+              val appReport = yarnClient.getApplicationReport(yarnAppId)
+              this.jmWebUrl = appReport.getTrackingUrl
+            } else {
+              this.jmWebUrl = clusterClient.getWebInterfaceURL
+            }
           } else {
             throw new Exception("Starting FlinkCluster in invalid mode: " + 
mode)
           }
-
-          this.jmWebUrl = clusterClient.getWebInterfaceURL;
         case None =>
           // remote mode
           LOGGER.info("Use FlinkCluster in remote mode")
@@ -458,14 +482,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   def setAsContext(): Unit = {
-    val factory = new StreamExecutionEnvironmentFactory() {
+    val streamFactory = new StreamExecutionEnvironmentFactory() {
       override def createExecutionEnvironment = senv.getJavaEnv
     }
     //StreamExecutionEnvironment
-    val method = 
classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
+    var method = 
classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
       classOf[StreamExecutionEnvironmentFactory])
     method.setAccessible(true)
-    method.invoke(null, factory);
+    method.invoke(null, streamFactory);
+
+    val batchFactory = new ExecutionEnvironmentFactory() {
+      override def createExecutionEnvironment = benv.getJavaEnv
+    }
+    //StreamExecutionEnvironment
+    method = 
classOf[JExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
+      classOf[ExecutionEnvironmentFactory])
+    method.setAccessible(true)
+    method.invoke(null, batchFactory);
   }
 
   // for use in java side
@@ -580,10 +613,15 @@ class FlinkScalaInterpreter(val properties: Properties) {
             LOGGER.info("Shutdown FlinkCluster")
             clusterClient.shutDownCluster()
             clusterClient.close()
+            // delete staging dir
+            if (mode == ExecutionMode.YARN) {
+              
cleanupStagingDirInternal(clusterClient.getClusterId.asInstanceOf[ApplicationId])
+            }
           case None =>
             LOGGER.info("Don't close the Remote FlinkCluster")
         }
       }
+
     } else {
       LOGGER.info("Keep cluster alive when closing interpreter")
     }
@@ -594,6 +632,19 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
   }
 
+  private def cleanupStagingDirInternal(appId: ApplicationId): Unit = {
+    try {
+      val fs = FileSystem.get(new org.apache.hadoop.conf.Configuration())
+      val stagingDirPath = new Path(fs.getHomeDirectory, ".flink/" + 
appId.toString)
+      if (fs.delete(stagingDirPath, true)) {
+        LOGGER.info(s"Deleted staging directory $stagingDirPath")
+      }
+    } catch {
+      case ioe: IOException =>
+        LOGGER.warn("Failed to cleanup staging dir", ioe)
+    }
+  }
+
   def getExecutionEnvironment(): ExecutionEnvironment = this.benv
 
   def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv
@@ -620,6 +671,8 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def getDefaultParallelism = this.defaultParallelism
 
+  def getDefaultSqlParallelism = this.defaultSqlParallelism
+
   def getUserJars: Seq[String] = {
     val flinkJars =
       if (!StringUtils.isBlank(properties.getProperty("flink.execution.jars", 
""))) {

Reply via email to