This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 4767c5f [ZEPPELIN-4656]. Improvement of FlinkInterpreter 4767c5f is described below commit 4767c5f3640ab02f08f1928f1b74a9119e379354 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sun Mar 1 13:56:02 2020 +0800 [ZEPPELIN-4656]. Improvement of FlinkInterpreter ### What is this PR for? This is a followup PR of ZEPPELIN-4488. What is done in this PR 1. Add missing properties in `interpreter-setting.json` 2. Fix the concurrency issue of running multiple sql simultaneously. 3. Add setting `flink.webui.yarn.useProxy` to allow use yarn proxy url. 4. Delete staging dir in yarn mode after cluster is shutdown ### What type of PR is it? [Bug Fix | Improvement | Documentation ] ### Todos * [ ] - Task ### What is the Jira issue? * https://jira.apache.org/jira/browse/ZEPPELIN-4656 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3676 from zjffdu/ZEPPELIN-4656 and squashes the following commits: 06c3c0a85 [Jeff Zhang] [ZEPPELIN-4656]. Improvement of FlinkInterpreter --- docs/interpreter/flink.md | 15 ++++ .../zeppelin/flink/FlinkBatchSqlInterpreter.java | 21 +---- .../apache/zeppelin/flink/FlinkInterpreter.java | 15 +++- .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 90 +++++++++++++++------- .../zeppelin/flink/FlinkStreamSqlInterpreter.java | 73 ++++++++---------- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 7 +- .../java/org/apache/zeppelin/flink/JobManager.java | 3 +- flink/src/main/resources/interpreter-setting.json | 25 +++++- .../zeppelin/flink/FlinkScalaInterpreter.scala | 65 ++++++++++++++-- 9 files changed, 216 insertions(+), 98 deletions(-) diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index dfc3b57..ffef1e7 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -132,6 +132,11 @@ You can also set other flink properties which are not listed in the table. For a <td>queue name of yarn app</td> </tr> <tr> + <td>flink.webui.yarn.useProxy</td> + <td>false</td> + <td>whether use yarn proxy url as flink weburl, e.g. http://localhost:8088/proxy/application_1583396598068_0004</td> + </tr> + <tr> <td>flink.udf.jars</td> <td></td> <td>udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name.</td> @@ -186,6 +191,16 @@ You can also set other flink properties which are not listed in the table. For a <td>1000</td> <td>max number of row returned by sql interpreter</td> </tr> + <tr> + <td>flink.interpreter.close.shutdown_cluster</td> + <td>true</td> + <td>Whether shutdown application when closing interpreter</td> + </tr> + <tr> + <td>zeppelin.interpreter.close.cancel_job</td> + <td>true</td> + <td>Whether cancel flink job when closing interpreter</td> + </tr> </table> diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java index e3a12e4..ba5319c 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java @@ -55,23 +55,10 @@ public class FlinkBatchSqlInterpreter extends FlinkSqlInterrpeter { @Override public void callInnerSelect(String sql, InterpreterContext context) throws IOException { - int defaultSqlParallelism = this.tbenv.getConfig().getConfiguration() - .getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - try { - if (context.getLocalProperties().containsKey("parallelism")) { - this.tbenv.getConfig().getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, - Integer.parseInt(context.getLocalProperties().get("parallelism"))); - } - Table table = this.tbenv.sqlQuery(sql); - z.setCurrentSql(sql); - String result = z.showData(table); - context.out.write(result); - } finally { - this.tbenv.getConfig().getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, - defaultSqlParallelism); - } + Table table = this.tbenv.sqlQuery(sql); + z.setCurrentSql(sql); + String result = z.showData(table); + context.out.write(result); } @Override diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index e32df4f..b089b36 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -66,7 +66,16 @@ public class FlinkInterpreter extends Interpreter { this.z.setInterpreterContext(context); this.z.setGui(context.getGui()); this.z.setNoteGui(context.getNoteGui()); - return innerIntp.interpret(st, context); + + // set ClassLoader of current Thread to be the ClassLoader of Flink scala-shell, + // otherwise codegen will fail to find classes defined in scala-shell + ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader()); + return innerIntp.interpret(st, context); + } finally { + Thread.currentThread().setContextClassLoader(originClassLoader); + } } @Override @@ -124,6 +133,10 @@ public class FlinkInterpreter extends Interpreter { return this.innerIntp.getDefaultParallelism(); } + int getDefaultSqlParallelism() { + return this.innerIntp.getDefaultSqlParallelism(); + } + public ClassLoader getFlinkScalaShellLoader() { return innerIntp.getFlinkScalaShellLoader(); } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 5ebc33f..d794b4b 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.flink; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.common.JobExecutionResult; @@ -26,6 +27,7 @@ import org.apache.flink.core.execution.JobListener; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand; import org.apache.zeppelin.interpreter.Interpreter; @@ -40,21 +42,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class FlinkSqlInterrpeter extends Interpreter { protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterrpeter.class); - protected static final String MESSAGE_HELP = new AttributedStringBuilder() + public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() .append("The following commands are available:\n\n") .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) - .append(formatCommand(SqlCommand.DROP_TABLE, - "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) + .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) @@ -65,20 +71,22 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) + .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) .style(AttributedStyle.DEFAULT.underline()) .append("\nHint") .style(AttributedStyle.DEFAULT) .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString() - .toString(); + .toAttributedString(); protected FlinkInterpreter flinkInterpreter; protected TableEnvironment tbenv; protected TableEnvironment tbenv_2; private SqlSplitter sqlSplitter; - private ReentrantLock lock = new ReentrantLock(); + private int defaultSqlParallelism; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + public FlinkSqlInterrpeter(Properties properties) { super(properties); @@ -94,8 +102,10 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { JobListener jobListener = new JobListener() { @Override public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { - lock.unlock(); - LOGGER.info("UnLock JobSubmitLock"); + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } } @Override @@ -106,6 +116,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener); flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener); + this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism(); } @Override @@ -134,7 +145,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { if (!sqlCommand.isPresent()) { try { context.out.write("%text Invalid Sql statement: " + sql + "\n"); - context.out.write(MESSAGE_HELP); + context.out.write(MESSAGE_HELP.toString()); } catch (IOException e) { return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); } @@ -175,6 +186,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { case SHOW_TABLES: callShowTables(context); break; + case SOURCE: + callSource(cmdCall.operands[0], context); + break; case SHOW_FUNCTIONS: callShowFunctions(context); break; @@ -231,10 +245,10 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { private void callAlterTable(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -243,10 +257,10 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { private void callAlterDatabase(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -257,7 +271,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { try { this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -268,7 +282,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { try { this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -285,7 +299,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { lock.lock(); this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query)); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -294,10 +308,10 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { private void callCreateTable(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -306,10 +320,10 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { private void callDropTable(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); this.tbenv.sqlUpdate(sql); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -326,7 +340,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { } private void callHelp(InterpreterContext context) throws IOException { - context.out.write(MESSAGE_HELP); + context.out.write(MESSAGE_HELP.toString()); } private void callShowCatalogs(InterpreterContext context) throws IOException { @@ -346,6 +360,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { "%table table\n" + StringUtils.join(tables, "\n") + "\n"); } + private void callSource(String sqlFile, InterpreterContext context) throws IOException { + String sql = IOUtils.toString(new FileInputStream(sqlFile)); + runSqlList(sql, context); + } + private void callShowFunctions(InterpreterContext context) throws IOException { String[] functions = this.tbenv.listUserDefinedFunctions(); context.out.write( @@ -369,11 +388,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { private void callExplain(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); Table table = this.tbenv.sqlQuery(sql); context.out.write(this.tbenv.explain(table) + "\n"); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } } @@ -381,12 +400,21 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { public void callSelect(String sql, InterpreterContext context) throws IOException { try { - lock.tryLock(); + lock.lock(); + if (context.getLocalProperties().containsKey("parallelism")) { + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + Integer.parseInt(context.getLocalProperties().get("parallelism"))); + } callInnerSelect(sql, context); + } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + defaultSqlParallelism); } } @@ -398,15 +426,23 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { context.getLocalProperties().put("flink.streaming.insert_into", "true"); } try { - lock.tryLock(); + lock.lock(); + if (context.getLocalProperties().containsKey("parallelism")) { + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + Integer.parseInt(context.getLocalProperties().get("parallelism"))); + } this.tbenv.sqlUpdate(sql); this.tbenv.execute(sql); } catch (Exception e) { throw new IOException(e); } finally { - if (lock.isLocked()) { + if (lock.isHeldByCurrentThread()) { lock.unlock(); } + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + defaultSqlParallelism); } context.out.write("Insertion successfully.\n"); } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java index 6ba9f98..6f7b326 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java @@ -69,50 +69,37 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter { .setString("execution.savepoint.path", savepointPath.toString()); } } - int defaultSqlParallelism = this.tbenv.getConfig().getConfiguration() - .getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - try { - if (context.getLocalProperties().containsKey("parallelism")) { - this.tbenv.getConfig().getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, - Integer.parseInt(context.getLocalProperties().get("parallelism"))); - } - String streamType = context.getLocalProperties().get("type"); - if (streamType == null) { - throw new IOException("type must be specified for stream sql"); - } - if (streamType.equalsIgnoreCase("single")) { - SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob( - flinkInterpreter.getStreamExecutionEnvironment(), - tbenv, - flinkInterpreter.getJobManager(), - context, - flinkInterpreter.getDefaultParallelism()); - streamJob.run(sql); - } else if (streamType.equalsIgnoreCase("append")) { - AppendStreamSqlJob streamJob = new AppendStreamSqlJob( - flinkInterpreter.getStreamExecutionEnvironment(), - flinkInterpreter.getStreamTableEnvironment(), - flinkInterpreter.getJobManager(), - context, - flinkInterpreter.getDefaultParallelism()); - streamJob.run(sql); - } else if (streamType.equalsIgnoreCase("update")) { - UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob( - flinkInterpreter.getStreamExecutionEnvironment(), - flinkInterpreter.getStreamTableEnvironment(), - flinkInterpreter.getJobManager(), - context, - flinkInterpreter.getDefaultParallelism()); - streamJob.run(sql); - } else { - throw new IOException("Unrecognized stream type: " + streamType); - } - } finally { - this.tbenv.getConfig().getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, - defaultSqlParallelism); + String streamType = context.getLocalProperties().get("type"); + if (streamType == null) { + throw new IOException("type must be specified for stream sql"); + } + if (streamType.equalsIgnoreCase("single")) { + SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob( + flinkInterpreter.getStreamExecutionEnvironment(), + tbenv, + flinkInterpreter.getJobManager(), + context, + flinkInterpreter.getDefaultParallelism()); + streamJob.run(sql); + } else if (streamType.equalsIgnoreCase("append")) { + AppendStreamSqlJob streamJob = new AppendStreamSqlJob( + flinkInterpreter.getStreamExecutionEnvironment(), + flinkInterpreter.getStreamTableEnvironment(), + flinkInterpreter.getJobManager(), + context, + flinkInterpreter.getDefaultParallelism()); + streamJob.run(sql); + } else if (streamType.equalsIgnoreCase("update")) { + UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob( + flinkInterpreter.getStreamExecutionEnvironment(), + flinkInterpreter.getStreamTableEnvironment(), + flinkInterpreter.getJobManager(), + context, + flinkInterpreter.getDefaultParallelism()); + streamJob.run(sql); + } else { + throw new IOException("Unrecognized stream type: " + streamType); } } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 8607d76..970f6cf 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -39,13 +39,17 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { private FlinkInterpreter flinkInterpreter; private InterpreterContext curInterpreterContext; + private boolean opened = false; public IPyFlinkInterpreter(Properties property) { super(property); } @Override - public void open() throws InterpreterException { + public synchronized void open() throws InterpreterException { + if (opened) { + return; + } FlinkInterpreter pyFlinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class, false); setProperty("zeppelin.python", @@ -53,6 +57,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { flinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); setAdditionalPythonInitFile("python/zeppelin_ipyflink.py"); super.open(); + opened = true; } @Override diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index 3a59079..e148cf5 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -54,6 +54,7 @@ public class JobManager { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobClient.getJobID(), context); + thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); if (previousJobClient != null) { @@ -135,6 +136,7 @@ public class JobManager { } FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID()); + jobProgressPoller.cancel(); jobProgressPoller.interrupt(); } @@ -156,7 +158,6 @@ public class JobManager { @Override public void run() { - while (!Thread.currentThread().isInterrupted() && running.get()) { try { JsonNode rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json index 65aac9e..ea92e45 100644 --- a/flink/src/main/resources/interpreter-setting.json +++ b/flink/src/main/resources/interpreter-setting.json @@ -68,6 +68,13 @@ "description": "yarn queue name", "type": "string" }, + "flink.webui.yarn.useProxy": { + "envName": null, + "propertyName": null, + "defaultValue": false, + "description": "whether use yarn proxy url as flink weburl, e.g. http://localhost:8088/proxy/application_1583396598068_0004", + "type": "checkbox" + }, "flink.udf.jars": { "envName": null, "propertyName": null, @@ -103,6 +110,13 @@ "description": "whether enable hive", "type": "checkbox" }, + "zeppelin.flink.hive.version": { + "envName": null, + "propertyName": null, + "defaultValue": "2.3.4", + "description": "hive version that you would like to connect", + "type": "string" + }, "zeppelin.flink.printREPLOutput": { "envName": null, "propertyName": "zeppelin.flink.printREPLOutput", @@ -127,9 +141,16 @@ "flink.interpreter.close.shutdown_cluster": { "envName": "flink.interpreter.close.shutdown_cluster", "propertyName": "flink.interpreter.close.shutdown_cluster", - "defaultValue": "true", + "defaultValue": true, "description": "Whether shutdown application when close interpreter", - "type": "string" + "type": "checkbox" + }, + "zeppelin.interpreter.close.cancel_job": { + "envName": "zeppelin.interpreter.close.cancel_job", + "propertyName": "zeppelin.interpreter.close.cancel_job", + "defaultValue": true, + "description": "Whether cancel flink job when closing interpreter", + "type": "checkbox" } }, "editor": { diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 29bd699..02ffaf2 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -18,7 +18,7 @@ package org.apache.zeppelin.flink -import java.io.{BufferedReader, File} +import java.io.{BufferedReader, File, IOException} import java.net.{URL, URLClassLoader} import java.nio.file.Files import java.util.{Map, Properties} @@ -34,7 +34,9 @@ import org.apache.flink.configuration._ import org.apache.flink.core.execution.{JobClient, JobListener} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment} +import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment} @@ -43,6 +45,11 @@ import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction} import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.module.hive.HiveModule +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.zeppelin.flink.util.DependencyUtils import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream @@ -95,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) { private var jmWebUrl: String = _ private var jobManager: JobManager = _ private var defaultParallelism = 1; + private var defaultSqlParallelism = 1; private var userJars: Seq[String] = _ def open(): Unit = { @@ -149,7 +157,9 @@ class FlinkScalaInterpreter(val properties: Properties) { // load other configuration from interpreter properties properties.asScala.foreach(entry => configuration.setString(entry._1, entry._2)) this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM) + this.defaultSqlParallelism = configuration.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM) LOGGER.info("Default Parallelism: " + this.defaultParallelism) + LOGGER.info("Default SQL Parallelism: " + this.defaultSqlParallelism) // set scala.color if (properties.getProperty("zeppelin.flink.scala.color", "true").toBoolean) { @@ -187,13 +197,27 @@ class FlinkScalaInterpreter(val properties: Properties) { // local mode or yarn if (mode == ExecutionMode.LOCAL) { LOGGER.info("Starting FlinkCluster in local mode") + this.jmWebUrl = clusterClient.getWebInterfaceURL } else if (mode == ExecutionMode.YARN) { LOGGER.info("Starting FlinkCluster in yarn mode") + if (properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean) { + val yarnAppId = clusterClient.getClusterId.asInstanceOf[ApplicationId] + val yarnClient = YarnClient.createYarnClient + val yarnConf = new YarnConfiguration() + // disable timeline service as we only query yarn app here. + // Otherwise we may hit this kind of ERROR: + // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig + yarnConf.set("yarn.timeline-service.enabled", "false") + yarnClient.init(yarnConf) + yarnClient.start() + val appReport = yarnClient.getApplicationReport(yarnAppId) + this.jmWebUrl = appReport.getTrackingUrl + } else { + this.jmWebUrl = clusterClient.getWebInterfaceURL + } } else { throw new Exception("Starting FlinkCluster in invalid mode: " + mode) } - - this.jmWebUrl = clusterClient.getWebInterfaceURL; case None => // remote mode LOGGER.info("Use FlinkCluster in remote mode") @@ -458,14 +482,23 @@ class FlinkScalaInterpreter(val properties: Properties) { } def setAsContext(): Unit = { - val factory = new StreamExecutionEnvironmentFactory() { + val streamFactory = new StreamExecutionEnvironmentFactory() { override def createExecutionEnvironment = senv.getJavaEnv } //StreamExecutionEnvironment - val method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment", + var method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment", classOf[StreamExecutionEnvironmentFactory]) method.setAccessible(true) - method.invoke(null, factory); + method.invoke(null, streamFactory); + + val batchFactory = new ExecutionEnvironmentFactory() { + override def createExecutionEnvironment = benv.getJavaEnv + } + //StreamExecutionEnvironment + method = classOf[JExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment", + classOf[ExecutionEnvironmentFactory]) + method.setAccessible(true) + method.invoke(null, batchFactory); } // for use in java side @@ -580,10 +613,15 @@ class FlinkScalaInterpreter(val properties: Properties) { LOGGER.info("Shutdown FlinkCluster") clusterClient.shutDownCluster() clusterClient.close() + // delete staging dir + if (mode == ExecutionMode.YARN) { + cleanupStagingDirInternal(clusterClient.getClusterId.asInstanceOf[ApplicationId]) + } case None => LOGGER.info("Don't close the Remote FlinkCluster") } } + } else { LOGGER.info("Keep cluster alive when closing interpreter") } @@ -594,6 +632,19 @@ class FlinkScalaInterpreter(val properties: Properties) { } } + private def cleanupStagingDirInternal(appId: ApplicationId): Unit = { + try { + val fs = FileSystem.get(new org.apache.hadoop.conf.Configuration()) + val stagingDirPath = new Path(fs.getHomeDirectory, ".flink/" + appId.toString) + if (fs.delete(stagingDirPath, true)) { + LOGGER.info(s"Deleted staging directory $stagingDirPath") + } + } catch { + case ioe: IOException => + LOGGER.warn("Failed to cleanup staging dir", ioe) + } + } + def getExecutionEnvironment(): ExecutionEnvironment = this.benv def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv @@ -620,6 +671,8 @@ class FlinkScalaInterpreter(val properties: Properties) { def getDefaultParallelism = this.defaultParallelism + def getDefaultSqlParallelism = this.defaultSqlParallelism + def getUserJars: Seq[String] = { val flinkJars = if (!StringUtils.isBlank(properties.getProperty("flink.execution.jars", ""))) {