This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 148129e [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode 148129e is described below commit 148129e4029009f1a6131144917262d2ec71d730 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Mar 17 10:34:19 2022 +0800 [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode ### What is this PR for? Trivial PR to check the existence of the folder, and throw a meaningful error when it doesn't exist. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5669 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4320 from zjffdu/ZEPPELIN-5669 and squashes the following commits: 769d0aa984 [Jeff Zhang] [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode --- .../main/java/org/apache/zeppelin/flink/Flink112Shims.java | 12 ++++++++---- .../main/java/org/apache/zeppelin/flink/Flink113Shims.java | 12 ++++++++---- .../main/java/org/apache/zeppelin/flink/Flink114Shims.java | 12 ++++++++---- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index e182f97..757e7a4 100644 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -125,20 +125,24 @@ public class Flink112Shims extends FlinkShims { if ("yarn-application".equalsIgnoreCase(mode)) { // for yarn application mode, FLINK_HOME is container working directory String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(flinkHome + "/lib/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); } String flinkHome = System.getenv("FLINK_HOME"); if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(flinkHome + "/opt/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); } else { throw new IOException("No FLINK_HOME is specified"); } } - private String getPyFlinkPythonPath(String pyFlinkFolder) { + private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles()); + if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { + throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", + pyFlinkFolder.getAbsolutePath())); + } + List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles()); StringBuilder builder = new StringBuilder(); for (File file : depFiles) { LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java index a4743e8..792174a 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java @@ -128,20 +128,24 @@ public class Flink113Shims extends FlinkShims { if ("yarn-application".equalsIgnoreCase(mode)) { // for yarn application mode, FLINK_HOME is container working directory String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(flinkHome + "/lib/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); } String flinkHome = System.getenv("FLINK_HOME"); if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(flinkHome + "/opt/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); } else { throw new IOException("No FLINK_HOME is specified"); } } - private String getPyFlinkPythonPath(String pyFlinkFolder) { + private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles()); + if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { + throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", + pyFlinkFolder.getAbsolutePath())); + } + List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles()); StringBuilder builder = new StringBuilder(); for (File file : depFiles) { LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java index e23ac54..2e4e4b4 100644 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java @@ -125,20 +125,24 @@ public class Flink114Shims extends FlinkShims { if ("yarn-application".equalsIgnoreCase(mode)) { // for yarn application mode, FLINK_HOME is container working directory String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(flinkHome + "/lib/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); } String flinkHome = System.getenv("FLINK_HOME"); if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(flinkHome + "/opt/python"); + return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); } else { throw new IOException("No FLINK_HOME is specified"); } } - private String getPyFlinkPythonPath(String pyFlinkFolder) { + private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles()); + if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { + throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", + pyFlinkFolder.getAbsolutePath())); + } + List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles()); StringBuilder builder = new StringBuilder(); for (File file : depFiles) { LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());