This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new deaf3d3e9f [ZEPPELIN-5600] Support Flink 1.15 (#4335) deaf3d3e9f is described below commit deaf3d3e9faf125e6d255c6cb405e650293b8b95 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri May 13 14:01:55 2022 +0800 [ZEPPELIN-5600] Support Flink 1.15 (#4335) --- .github/workflows/core.yml | 27 +- docs/interpreter/flink.md | 7 + flink/flink-scala-parent/pom.xml | 40 +- .../org/apache/zeppelin/flink/TableEnvFactory.java | 138 +---- .../internal/ScalaShellStreamEnvironment.java | 21 +- .../zeppelin/flink/sql/AbstractStreamSqlJob.java | 16 +- .../zeppelin/flink/FlinkScalaInterpreter.scala | 17 +- .../zeppelin/flink/internal/FlinkILoop.scala | 2 +- .../java/org/apache/zeppelin/flink/FlinkShims.java | 11 +- .../org/apache/zeppelin/flink/Flink112Shims.java | 19 +- .../org/apache/zeppelin/flink/Flink113Shims.java | 19 +- .../org/apache/zeppelin/flink/Flink114Shims.java | 18 +- flink/flink1.15-shims/pom.xml | 200 +++++++ .../org/apache/zeppelin/flink/Flink115Shims.java} | 65 ++- .../zeppelin/flink/Flink115SqlInterpreter.java | 590 +++++++++++++++++++++ .../java/org/apache/zeppelin/flink/PrintUtils.java | 318 +++++++++++ .../zeppelin/flink/TimestampStringUtils.java | 143 +++++ .../flink/shims115/CollectStreamTableSink.java | 97 ++++ flink/pom.xml | 44 +- testing/env_python_3_with_flink_115.yml | 29 + .../integration/ZeppelinFlinkClusterTest115.java | 38 ++ .../launcher/FlinkInterpreterLauncher.java | 21 +- 22 files changed, 1666 insertions(+), 214 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 30b675b6e9..b9b3953fc8 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -198,7 +198,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am + ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114 ./mvnw package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -216,13 +216,16 @@ jobs: R -e "IRkernel::installspec()" - name: run tests run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest,ShellIntegrationTest,JdbcIntegrationTest - + - name: Print zeppelin logs + if: always() + run: if [ -d "logs" ]; then cat logs/*; fi + flink-test-and-flink-integration-test: runs-on: ubuntu-20.04 strategy: fail-fast: false matrix: - flink: [112, 113, 114] + flink: [112, 113, 114, 115] steps: - name: Checkout uses: actions/checkout@v2 @@ -244,10 +247,16 @@ jobs: key: ${{ runner.os }}-zeppelin-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-zeppelin- - - name: install environment + - name: install environment for flink before 1.15 (exclusive) + if: matrix.flink != '115' run: | ./mvnw install -DskipTests -DskipRat -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -B ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B + - name: install environment for flink after 1.15 (inclusive) + if: matrix.flink == '115' + run: | + ./mvnw install -DskipTests -DskipRat -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -B + ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B - name: Setup conda environment with python 3.7 and uses: conda-incubator/setup-miniconda@v2 with: @@ -259,8 +268,16 @@ jobs: channel-priority: true auto-activate-base: false use-mamba: true - - name: run tests + - name: run tests for flink before 1.15 (exclusive) + if: matrix.flink != '115' run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} + - name: run tests for flink before 1.15 (inclusive) + if: matrix.flink == '115' + run: ./mvnw test -DskipRat -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} + - name: Print zeppelin logs + if: always() + run: if [ -d "logs" ]; then cat logs/*; fi + spark-integration-test: runs-on: ubuntu-20.04 diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index fa86d8308e..0241ff9baf 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -144,6 +144,13 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported) +### Specific for Flink 1.15 + +Flink 1.15 is scala free and has changed its binary distribution. If you would like to make Zeppelin work with Flink 1.15, you need to do the following extra steps. +* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib +* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt +* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib + ## Flink on Zeppelin Architecture diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index efebcedf8a..203773d6e2 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -41,6 +41,8 @@ <hive.version>2.3.4</hive.version> <hiverunner.version>4.0.0</hiverunner.version> <grpc.version>1.15.0</grpc.version> + <!-- Start from 1.15 flink is scala free (no scala version in its artifact) --> + <flink.library.scala.suffix>_${flink.scala.binary.version}</flink.library.scala.suffix> <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz</flink.bin.download.url> </properties> @@ -72,6 +74,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.15-shims</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> @@ -147,14 +155,14 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${flink.scala.binary.version}</artifactId> + <artifactId>flink-clients${flink.library.scala.suffix}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-yarn_${flink.scala.binary.version}</artifactId> + <artifactId>flink-yarn${flink.library.scala.suffix}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> @@ -193,7 +201,7 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> + <artifactId>flink-table-api-java-bridge${flink.library.scala.suffix}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> @@ -207,7 +215,7 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId> + <artifactId>flink-streaming-java${flink.library.scala.suffix}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> @@ -1017,6 +1025,30 @@ </dependencies> </profile> + <profile> + <id>flink-115</id> + <properties> + <flink.version>${flink1.15.version}</flink.version> + <flink.scala.version>2.12.7</flink.scala.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + <flink.library.scala.suffix></flink.library.scala.suffix> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </profile> + <profile> <id>hive2</id> <activation> diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 1456a9f307..711993578d 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -67,7 +67,6 @@ public class TableEnvFactory { private CatalogManager oldPlannerCatalogManager; private ModuleManager moduleManager; private FunctionCatalog functionCatalog; - private FunctionCatalog oldPlannerFunctionCatalog; public TableEnvFactory(FlinkVersion flinkVersion, @@ -94,12 +93,8 @@ public class TableEnvFactory { this.catalogManager = (CatalogManager) flinkShims.createCatalogManager(streamTableConfig.getConfiguration()); this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager( this.oldPlannerStreamTableConfig.getConfiguration()); - this.moduleManager = new ModuleManager(); - - this.functionCatalog = new FunctionCatalog(streamTableConfig, catalogManager, moduleManager); - this.oldPlannerFunctionCatalog = new FunctionCatalog( - this.oldPlannerStreamTableConfig, this.oldPlannerCatalogManager, moduleManager); + this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig, catalogManager, moduleManager); } public TableEnvironment createScalaFlinkBatchTableEnvironment() { @@ -121,67 +116,6 @@ public class TableEnvFactory { } } - public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { - try { - ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( - classLoader, settings, senv.getJavaEnv(), - oldPlannerStreamTableConfig, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - Class clazz = Class - .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); - - try { - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance( - oldPlannerCatalogManager, - moduleManager, - oldPlannerFunctionCatalog, - oldPlannerStreamTableConfig, - senv, - planner, - executor, - settings.isStreamingMode()); - } catch (NoSuchMethodException e) { - // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class, - ClassLoader.class); - return (TableEnvironment) constructor.newInstance( - oldPlannerCatalogManager, - moduleManager, - oldPlannerFunctionCatalog, - oldPlannerStreamTableConfig, - senv, - planner, - executor, - settings.isStreamingMode(), - classLoader); - } - - } catch (Exception e) { - throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e); - } - } - public TableEnvironment createJavaFlinkBatchTableEnvironment() { try { Class<?> clazz = Class @@ -203,74 +137,12 @@ public class TableEnvFactory { } } - public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, - ClassLoader classLoader) { - try { - ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( - classLoader, settings, senv.getJavaEnv(), - oldPlannerBatchTableConfig, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - Class clazz = Class - .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - - try { - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance( - oldPlannerCatalogManager, - moduleManager, - oldPlannerFunctionCatalog, - oldPlannerStreamTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); - } catch (NoSuchMethodException e) { - // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class, - ClassLoader.class); - return (TableEnvironment) constructor.newInstance( - oldPlannerCatalogManager, - moduleManager, - oldPlannerFunctionCatalog, - oldPlannerStreamTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode(), - classLoader); - } - - } catch (Exception e) { - throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e); - } - } - public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( classLoader, settings, senv.getJavaEnv(), - streamTableConfig, functionCatalog, catalogManager); + streamTableConfig, moduleManager, functionCatalog, catalogManager); Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; @@ -327,7 +199,7 @@ public class TableEnvFactory { try { ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( classLoader, settings, senv.getJavaEnv(), - streamTableConfig, functionCatalog, catalogManager); + streamTableConfig, moduleManager, functionCatalog, catalogManager); Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; @@ -386,7 +258,7 @@ public class TableEnvFactory { try { ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( classLoader, settings, senv.getJavaEnv(), - batchTableConfig, functionCatalog, catalogManager); + batchTableConfig, moduleManager, functionCatalog, catalogManager); Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; @@ -443,7 +315,7 @@ public class TableEnvFactory { public void createStreamPlanner(EnvironmentSettings settings) { ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( Thread.currentThread().getContextClassLoader(), settings, senv.getJavaEnv(), - streamTableConfig, functionCatalog, catalogManager); + streamTableConfig, moduleManager, functionCatalog, catalogManager); Planner planner = (Planner) pair.left; this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java index 6bba854ed2..bee40a0f69 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java @@ -79,22 +79,17 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment { } public Object getFlinkConfiguration() { - if (flinkVersion.isAfterFlink114()) { - // starting from Flink 1.14, getConfiguration() return the readonly copy of internal - // configuration, so we need to get the internal configuration object via reflection. - try { - Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); - configurationField.setAccessible(true); - return configurationField.get(this); - } catch (Exception e) { - throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e); - } - } else { - return super.getConfiguration(); + // starting from Flink 1.14, getConfiguration() return the readonly copy of internal + // configuration, so we need to get the internal configuration object via reflection. + try { + Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configurationField.setAccessible(true); + return configurationField.get(this); + } catch (Exception e) { + throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e); } } - private List<URL> getUpdatedJarFiles() throws MalformedURLException { final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); final List<URL> allJarFiles = new ArrayList<>(jarFiles); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 8288615830..1ee26143cc 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -139,19 +139,7 @@ public abstract class AbstractStreamSqlJob { // new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); collectTableSink = (RetractStreamTableSink) collectTableSink.configure( outputType.getFieldNames(), outputType.getFieldTypes()); - - // workaround, otherwise it won't find the sink properly - String originalCatalog = stenv.getCurrentCatalog(); - String originalDatabase = stenv.getCurrentDatabase(); - try { - stenv.useCatalog("default_catalog"); - stenv.useDatabase("default_database"); - flinkShims.registerTableSink(stenv, tableName, collectTableSink); - table.insertInto(tableName); - } finally { - stenv.useCatalog(originalCatalog); - stenv.useDatabase(originalDatabase); - } + flinkShims.registerTableSink(stenv, tableName, collectTableSink); long delay = 1000L; long period = Long.parseLong( @@ -163,7 +151,7 @@ public abstract class AbstractStreamSqlJob { LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism); String jobName = context.getStringLocalProperty("jobName", tableName); - stenv.execute(jobName); + table.executeInsert(tableName).await(); LOGGER.info("Flink Job is finished, jobName: " + jobName); // wait for retrieve thread consume all data LOGGER.info("Waiting for retrieve thread to be done"); diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 1e9b1dcabb..40adcffeae 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -432,13 +432,18 @@ abstract class FlinkScalaInterpreter(val properties: Properties, this.benv, this.senv, tableConfig) // blink planner - var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build() + var btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inBatchMode() + .build() this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader); flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) this.java_btenv = this.btenv - var stEnvSetting = - EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build() + var stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inStreamingMode() + .build() this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) @@ -586,8 +591,10 @@ abstract class FlinkScalaInterpreter(val properties: Properties, val originalClassLoader = Thread.currentThread().getContextClassLoader try { Thread.currentThread().setContextClassLoader(getFlinkClassLoader) - val stEnvSetting = - EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build() + val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inStreamingMode() + .build() this.tblEnvFactory.createStreamPlanner(stEnvSetting) } finally { Thread.currentThread().setContextClassLoader(originalClassLoader) diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala index eedcbe41e9..b133487749 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala @@ -30,7 +30,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.core.execution.PipelineExecutorServiceLoader import org.apache.zeppelin.flink.{ApplicationModeExecutionEnvironment, ApplicationModeStreamEnvironment, FlinkScalaInterpreter} import FlinkShell.ExecutionMode -import org.apache.commons.lang.StringUtils +import org.apache.commons.lang3.StringUtils import org.slf4j.{Logger, LoggerFactory} import scala.tools.nsc.interpreter._ diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 2a3d82edef..a473a0f888 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -62,6 +62,9 @@ public abstract class FlinkShims { } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) { LOGGER.info("Initializing shims for Flink 1.14"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 15) { + LOGGER.info("Initializing shims for Flink 1.15"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink115Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } @@ -88,6 +91,8 @@ public abstract class FlinkShims { return flinkVersion; } + public abstract Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager); + public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext); public abstract void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext); @@ -142,7 +147,11 @@ public abstract class FlinkShims { public abstract ImmutablePair<Object, Object> createPlannerAndExecutor( ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager); + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager); + + public abstract Object createBlinkPlannerEnvSettingBuilder(); + + public abstract Object createOldPlannerEnvSettingBuilder(); public abstract InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch); } diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index 757e7a487b..5088a7f623 100644 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -48,6 +48,7 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; @@ -94,6 +95,11 @@ public class Flink112Shims extends FlinkShims { this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, false); } + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + } + @Override public void disableSysoutLogging(Object batchConfig, Object streamConfig) { // do nothing @@ -265,7 +271,7 @@ public class Flink112Shims extends FlinkShims { @Override public ImmutablePair<Object, Object> createPlannerAndExecutor( ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); Map<String, String> plannerProperties = settings.toPlannerProperties(); @@ -276,6 +282,17 @@ public class Flink112Shims extends FlinkShims { return ImmutablePair.of(planner, executor); } + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useBlinkPlanner(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useOldPlanner(); + } + + @Override public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { if (isBatch) { return batchSqlInterpreter.runSqlList(st, context); diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java index 792174ad94..1df3f92951 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java @@ -51,6 +51,7 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; @@ -97,6 +98,11 @@ public class Flink113Shims extends FlinkShims { this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false); } + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + } + @Override public void disableSysoutLogging(Object batchConfig, Object streamConfig) { // do nothing @@ -285,7 +291,7 @@ public class Flink113Shims extends FlinkShims { @Override public ImmutablePair<Object, Object> createPlannerAndExecutor( ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); Map<String, String> plannerProperties = settings.toPlannerProperties(); @@ -296,6 +302,17 @@ public class Flink113Shims extends FlinkShims { return ImmutablePair.of(planner, executor); } + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useBlinkPlanner(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useOldPlanner(); + } + + @Override public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { if (isBatch) { return batchSqlInterpreter.runSqlList(st, context); diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java index 2e4e4b42ca..9660c9f469 100644 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java @@ -49,6 +49,7 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; @@ -93,6 +94,11 @@ public class Flink114Shims extends FlinkShims { this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false); } + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + } + @Override public void disableSysoutLogging(Object batchConfig, Object streamConfig) { // do nothing @@ -285,7 +291,7 @@ public class Flink114Shims extends FlinkShims { @Override public ImmutablePair<Object, Object> createPlannerAndExecutor( ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor, @@ -295,6 +301,16 @@ public class Flink114Shims extends FlinkShims { return ImmutablePair.of(planner, executor); } + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useBlinkPlanner(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance().useOldPlanner(); + } + public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { if (isBatch) { return batchSqlInterpreter.runSqlList(st, context); diff --git a/flink/flink1.15-shims/pom.xml b/flink/flink1.15-shims/pom.xml new file mode 100644 index 0000000000..6921b06648 --- /dev/null +++ b/flink/flink1.15-shims/pom.xml @@ -0,0 +1,200 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.15-shims</artifactId> + <version>0.11.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Flink1.15 Shims</name> + + <properties> + <flink.version>${flink1.15.version}</flink.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${flink.scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + <arg>-target:jvm-1.8</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path,-options</javacArg> + </javacArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java similarity index 84% copy from flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java copy to flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java index 2e4e4b42ca..98c250773d 100644 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java @@ -49,13 +49,13 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims114.CollectStreamTableSink; +import org.apache.zeppelin.flink.shims115.CollectStreamTableSink; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.slf4j.Logger; @@ -72,25 +72,30 @@ import java.util.Properties; /** - * Shims for flink 1.14 + * Shims for flink 1.15 */ -public class Flink114Shims extends FlinkShims { +public class Flink115Shims extends FlinkShims { - private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Flink115Shims.class); - private Flink114SqlInterpreter batchSqlInterpreter; - private Flink114SqlInterpreter streamSqlInterpreter; + private Flink115SqlInterpreter batchSqlInterpreter; + private Flink115SqlInterpreter streamSqlInterpreter; - public Flink114Shims(FlinkVersion flinkVersion, Properties properties) { + public Flink115Shims(FlinkVersion flinkVersion, Properties properties) { super(flinkVersion, properties); } public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, true); + this.batchSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, true); } public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false); + this.streamSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, false); + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @Override @@ -98,7 +103,6 @@ public class Flink114Shims extends FlinkShims { // do nothing } - @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { return new StreamExecutionEnvironmentFactory() { @@ -172,14 +176,12 @@ public class Flink114Shims extends FlinkShims { @Override public Object fromDataSet(Object btenv, Object ds) { - return null; - //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); + throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.15"); } @Override public Object toDataSet(Object btenv, Object table) { - return null; - //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); + throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.15"); } @Override @@ -190,7 +192,7 @@ public class Flink114Shims extends FlinkShims { @Override public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); + ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); } @Override @@ -211,6 +213,7 @@ public class Flink114Shims extends FlinkShims { /** * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. + * * @param catalogManager * @param parserObject * @param environmentSetting @@ -224,10 +227,10 @@ public class Flink114Shims extends FlinkShims { @Override public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); try { - ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); - return effectiveConfig; + ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); + return effectiveConfig; } catch (FlinkException e) { throw new RuntimeException("Fail to call addAll", e); } @@ -241,8 +244,7 @@ public class Flink114Shims extends FlinkShims { @Override public void setOldPlanner(Object tableConfig) { - ((TableConfig) tableConfig).getConfiguration() - .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD); + } @Override @@ -263,12 +265,12 @@ public class Flink114Shims extends FlinkShims { } private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { + Object settings, + Object sEnv) { try { final ExecutorFactory executorFactory = FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor()); + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); final Method createMethod = executorFactory .getClass() @@ -285,16 +287,27 @@ public class Flink114Shims extends FlinkShims { @Override public ImmutablePair<Object, Object> createPlannerAndExecutor( ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); - Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor, + Planner planner = PlannerFactoryUtil.createPlanner(executor, (TableConfig) tableConfig, + (ModuleManager) moduleManager, (CatalogManager) catalogManager, (FunctionCatalog) functionCatalog); return ImmutablePair.of(planner, executor); } + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { if (isBatch) { return batchSqlInterpreter.runSqlList(st, context); diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java new file mode 100644 index 0000000000..6c0c67fb2b --- /dev/null +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.*; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.ddl.*; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class Flink115SqlInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink115SqlInterpreter.class); + private static final String CMD_DESC_DELIMITER = "\t\t"; + + /** + * SQL Client HELP command helper class. + */ + private static final class SQLCliCommandsDescriptions { + private int commandMaxLength; + private final Map<String, String> commandsDescriptions; + + public SQLCliCommandsDescriptions() { + this.commandsDescriptions = new LinkedHashMap<>(); + this.commandMaxLength = -1; + } + + public SQLCliCommandsDescriptions commandDescription(String command, String description) { + Preconditions.checkState( + StringUtils.isNotBlank(command), "content of command must not be empty."); + Preconditions.checkState( + StringUtils.isNotBlank(description), + "content of command's description must not be empty."); + this.updateMaxCommandLength(command.length()); + this.commandsDescriptions.put(command, description); + return this; + } + + private void updateMaxCommandLength(int newLength) { + Preconditions.checkState(newLength > 0); + if (this.commandMaxLength < newLength) { + this.commandMaxLength = newLength; + } + } + + public AttributedString build() { + AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); + if (!this.commandsDescriptions.isEmpty()) { + this.commandsDescriptions.forEach( + (cmd, cmdDesc) -> { + attributedStringBuilder + .style(AttributedStyle.DEFAULT.bold()) + .append( + String.format( + String.format("%%-%ds", commandMaxLength), cmd)) + .append(CMD_DESC_DELIMITER) + .style(AttributedStyle.DEFAULT) + .append(cmdDesc) + .append('\n'); + }); + } + return attributedStringBuilder.toAttributedString(); + } + } + + private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = + new SQLCliCommandsDescriptions() + .commandDescription("HELP", "Prints the available commands.") + .commandDescription( + "SET", + "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.") + .commandDescription( + "RESET", + "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.") + .commandDescription( + "INSERT INTO", + "Inserts the results of a SQL SELECT query into a declared table sink.") + .commandDescription( + "INSERT OVERWRITE", + "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") + .commandDescription( + "SELECT", "Executes a SQL SELECT query on the Flink cluster.") + .commandDescription( + "EXPLAIN", + "Describes the execution plan of a query or table with the given name.") + .commandDescription( + "BEGIN STATEMENT SET", + "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") + .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") + // (TODO) zjffdu, ADD/REMOVE/SHOW JAR + .build(); + + // -------------------------------------------------------------------------------------------- + + public static final AttributedString MESSAGE_HELP = + new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(SQL_CLI_COMMANDS_DESCRIPTIONS) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append( + ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") + // About Documentation Link. + .style(AttributedStyle.DEFAULT) + .append( + "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") + .toAttributedString(); + + private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; + + private FlinkSqlContext flinkSqlContext; + private TableEnvironment tbenv; + private ZeppelinContext z; + private Parser sqlParser; + private SqlSplitter sqlSplitter; + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; + private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>(); + private boolean isBatch; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + + + public Flink115SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { + this.flinkSqlContext = flinkSqlContext; + this.isBatch = isBatch; + if (isBatch) { + this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); + } else { + this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); + } + this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); + this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); + this.sqlSplitter = new SqlSplitter(); + JobListener jobListener = new JobListener() { + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + }; + + ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); + ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context) { + try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + String jobName = context.getLocalProperties().get("jobName"); + if (StringUtils.isNotBlank(jobName)) { + tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); + } + + List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); + for (String sql : sqls) { + List<Operation> operations = null; + try { + operations = sqlParser.parse(sql); + } catch (SqlParserException e) { + context.out.write("%text Invalid Sql statement: " + sql + "\n"); + context.out.write(MESSAGE_HELP.toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); + } + + try { + callOperation(sql, operations.get(0), context); + context.out.flush(); + } catch (Throwable e) { + LOGGER.error("Fail to run sql:" + sql, e); + try { + context.out.write("%text Fail to run sql command: " + + sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); + } catch (IOException ex) { + LOGGER.warn("Unexpected exception:", ex); + return new InterpreterResult(InterpreterResult.Code.ERROR, + ExceptionUtils.getStackTrace(e)); + } + return new InterpreterResult(InterpreterResult.Code.ERROR); + } + } + + if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { + try { + lock.lock(); + List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (!modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + statementOperationsMap.remove(context.getParagraphId()); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + + private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { + if (operation instanceof HelpOperation) { + // HELP + callHelp(context); + } else if (operation instanceof SetOperation) { + // SET + callSet((SetOperation) operation, context); + } else if (operation instanceof ModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((ModifyOperation) operation, context); + } else if (operation instanceof QueryOperation) { + // SELECT + callSelect(sql, (QueryOperation) operation, context); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation, context); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(context); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(context); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation, context); + } else if (operation instanceof ShowCatalogsOperation) { + callShowCatalogs(context); + } else if (operation instanceof ShowCurrentCatalogOperation) { + callShowCurrentCatalog(context); + } else if (operation instanceof UseCatalogOperation) { + callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); + } else if (operation instanceof CreateCatalogOperation) { + callDDL(sql, context, "Catalog has been created."); + } else if (operation instanceof DropCatalogOperation) { + callDDL(sql, context, "Catalog has been dropped."); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; + callUseDatabase(useDBOperation.getDatabaseName(), context); + } else if (operation instanceof CreateDatabaseOperation) { + callDDL(sql, context, "Database has been created."); + } else if (operation instanceof DropDatabaseOperation) { + callDDL(sql, context, "Database has been removed."); + } else if (operation instanceof AlterDatabaseOperation) { + callDDL(sql, context, "Alter database succeeded!"); + } else if (operation instanceof ShowDatabasesOperation) { + callShowDatabases(context); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + callShowCurrentDatabase(context); + } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { + callDDL(sql, context, "Table has been created."); + } else if (operation instanceof AlterTableOperation) { + callDDL(sql, context, "Alter table succeeded!"); + } else if (operation instanceof DropTableOperation) { + callDDL(sql, context, "Table has been dropped."); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); + } else if (operation instanceof ShowTablesOperation) { + callShowTables(context); + } else if (operation instanceof CreateViewOperation) { + callDDL(sql, context, "View has been created."); + } else if (operation instanceof DropViewOperation) { + callDDL(sql, context, "View has been dropped."); + } else if (operation instanceof AlterViewOperation) { + callDDL(sql, context, "Alter view succeeded!"); + } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been created."); + } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been removed."); + } else if (operation instanceof AlterCatalogFunctionOperation) { + callDDL(sql, context, "Alter function succeeded!"); + } else if (operation instanceof ShowFunctionsOperation) { + callShowFunctions(context); + } else if (operation instanceof ShowModulesOperation) { + callShowModules(context); + } else if (operation instanceof ShowPartitionsOperation) { + ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; + callShowPartitions(showPartitionsOperation.asSummaryString(), context); + } else { + throw new IOException(operation.getClass().getName() + " is not supported"); + } + } + + + private void callHelp(InterpreterContext context) throws IOException { + context.out.write(MESSAGE_HELP.toString() + "\n"); + } + + private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException { + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + modifyOperations.add(operation); + } else { + callInserts(Collections.singletonList(operation), context); + } + } + + private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException { + if (!isBatch) { + context.getLocalProperties().put("flink.streaming.insert_into", "true"); + } + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); + checkState(tableResult.getJobClient().isPresent()); + try { + tableResult.await(); + JobClient jobClient = tableResult.getJobClient().get(); + if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { + context.out.write("Insertion successfully.\n"); + } else { + throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); + } + } catch (InterruptedException e) { + throw new IOException("Flink job is interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Flink job is failed", e); + } + } + + private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (isBatch) { + callBatchInnerSelect(sql, context); + } else { + callStreamInnerSelect(sql, context); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { + Table table = this.tbenv.sqlQuery(sql); + String result = z.showData(table); + context.out.write(result); + } + + public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { + flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); + } + + public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + // set a property + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + this.tbenv.getConfig().getConfiguration().setString(key, value); + LOGGER.info("Set table config: {}={}", key, value); + } else { + // show all properties + final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap(); + List<String> prettyEntries = new ArrayList<>(); + for (String key : properties.keySet()) { + prettyEntries.add( + String.format( + "'%s' = '%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(properties.get(key)))); + } + prettyEntries.sort(String::compareTo); + prettyEntries.forEach(entry -> { + try { + context.out.write(entry + "\n"); + } catch (IOException e) { + LOGGER.warn("Fail to write output", e); + } + }); + } + } + + private void callBeginStatementSet(InterpreterContext context) throws IOException { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + private void callEndStatementSet(InterpreterContext context) throws IOException { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + if (modifyOperations != null && !modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } else { + context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); + } + } + + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { + tbenv.executeSql("USE CATALOG `" + catalog + "`"); + } + + private void callUseDatabase(String databaseName, + InterpreterContext context) throws IOException { + this.tbenv.executeSql("USE `" + databaseName + "`"); + } + + private void callShowCatalogs(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); + List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); + } + + private void callShowCurrentCatalog(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); + String catalog = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current catalog: " + catalog + "\n"); + } + + private void callShowDatabases(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); + List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table database\n" + StringUtils.join(databases, "\n") + "\n"); + } + + private void callShowCurrentDatabase(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); + String database = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current database: " + database + "\n"); + } + + private void callShowTables(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); + List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .filter(tbl -> !tbl.startsWith("UnnamedTable")) + .collect(Collectors.toList()); + context.out.write( + "%table table\n" + StringUtils.join(tables, "\n") + "\n"); + } + + private void callShowFunctions(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); + List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table function\n" + StringUtils.join(functions, "\n") + "\n"); + } + + private void callShowModules(InterpreterContext context) throws IOException { + String[] modules = this.tbenv.listModules(); + context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); + } + + private void callShowPartitions(String sql, InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql(sql); + List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); + } + + private void callDDL(String sql, InterpreterContext context, String message) throws IOException { + try { + lock.lock(); + this.tbenv.executeSql(sql); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + context.out.write(message + "\n"); + } + + private void callDescribe(String name, InterpreterContext context) throws IOException { + TableResult tableResult = null; + try { + tableResult = tbenv.executeSql("DESCRIBE " + name); + } catch (Exception e) { + throw new IOException("Fail to describe table: " + name, e); + } + CloseableIterator<Row> result = tableResult.collect(); + StringBuilder builder = new StringBuilder(); + builder.append("Column\tType\n"); + while (result.hasNext()) { + Row row = result.next(); + builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); + } + context.out.write("%table\n" + builder.toString()); + } +} diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java new file mode 100644 index 0000000000..a35ad3a6cd --- /dev/null +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.zeppelin.flink.TimestampStringUtils.*; + +/** + * Copied from flink-project with minor modification. + * */ +public class PrintUtils { + + public static final String NULL_COLUMN = "(NULL)"; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + + private PrintUtils() {} + + + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); + } + + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List<String> fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); + } + } + return fields.toArray(new String[0]); + } + + /** + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. + * + * <p>This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List<?> array = (List<?>) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map<Object, Object> map = ((Map) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampField( + Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) { + switch (fieldType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = getPrecision(fieldType); + if (timestampField instanceof java.sql.Timestamp) { + // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE + return timestampToString( + ((Timestamp) timestampField).toLocalDateTime(), precision); + } else if (timestampField instanceof java.time.LocalDateTime) { + return timestampToString(((LocalDateTime) timestampField), precision); + } else if (timestampField instanceof TimestampData) { + return timestampToString( + ((TimestampData) timestampField).toLocalDateTime(), precision); + } else { + return timestampField; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + Instant instant = null; + if (timestampField instanceof java.time.Instant) { + instant = ((Instant) timestampField); + } else if (timestampField instanceof java.sql.Timestamp) { + Timestamp timestamp = ((Timestamp) timestampField); + // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE + instant = + TimestampData.fromEpochMillis( + timestamp.getTime(), timestamp.getNanos() % 1000_000) + .toInstant(); + } else if (timestampField instanceof TimestampData) { + instant = ((TimestampData) timestampField).toInstant(); + } else if (timestampField instanceof Integer) { + instant = Instant.ofEpochSecond((Integer) timestampField); + } else if (timestampField instanceof Long) { + instant = Instant.ofEpochMilli((Long) timestampField); + } + if (instant != null) { + return timestampToString( + instant.atZone(sessionTimeZone).toLocalDateTime(), + getPrecision(fieldType)); + } else { + return timestampField; + } + default: + return timestampField; + } + } + + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } +} diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java new file mode 100644 index 0000000000..c52104e45a --- /dev/null +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.sql.Time; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; + +/** + * Copied from flink-project with minor modification. + * */ +public class TimestampStringUtils { + + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + public TimestampStringUtils() { + } + + public static String timestampToString(LocalDateTime ldt, int precision) { + String fraction; + for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) { + } + + StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); + if (fraction.length() > 0) { + ymdhms.append(".").append(fraction); + } + + return ymdhms.toString(); + } + + private static String pad(int length, long v) { + StringBuilder s = new StringBuilder(Long.toString(v)); + + while(s.length() < length) { + s.insert(0, "0"); + } + + return s.toString(); + } + + private static StringBuilder hms(StringBuilder b, int h, int m, int s) { + int2(b, h); + b.append(':'); + int2(b, m); + b.append(':'); + int2(b, s); + return b; + } + + private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) { + ymd(b, year, month, day); + b.append(' '); + hms(b, h, m, s); + return b; + } + + private static StringBuilder ymd(StringBuilder b, int year, int month, int day) { + int4(b, year); + b.append('-'); + int2(b, month); + b.append('-'); + int2(b, day); + return b; + } + + private static void int4(StringBuilder buf, int i) { + buf.append((char)(48 + i / 1000 % 10)); + buf.append((char)(48 + i / 100 % 10)); + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + private static void int2(StringBuilder buf, int i) { + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + public static String unixTimeToString(int time) { + StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + while(time < 0) { + time = (int)((long)time + 86400000L); + } + + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / '\uea60'; + int time3 = time2 % '\uea60'; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + + while(precision > 0) { + buf.append((char)(48 + ms / 100)); + ms %= 100; + ms *= 10; + if (ms == 0) { + break; + } + + --precision; + } + } + + } + + public static int timeToInternal(Time time) { + long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime()); + return (int)(ts % 86400000L); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000; + } +} diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java new file mode 100644 index 0000000000..0025389b26 --- /dev/null +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims115; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink<Row> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + + private String[] fieldNames; + private TypeInformation<?>[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer<Tuple2<Boolean, Row>> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation<Row> getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index 02a9ec547d..dc1664a0cf 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -32,23 +32,61 @@ <name>Zeppelin: Flink Parent</name> <description>Zeppelin Flink Support</description> + <modules> <module>flink-scala-parent</module> - <module>flink-scala-2.11</module> - <module>flink-scala-2.12</module> <module>flink-shims</module> <module>flink1.12-shims</module> <module>flink1.13-shims</module> <module>flink1.14-shims</module> + <module>flink1.15-shims</module> </modules> <properties> <flink1.12.version>1.12.4</flink1.12.version> <flink1.13.version>1.13.2</flink1.13.version> <flink1.14.version>1.14.0</flink1.14.version> + <flink1.15.version>1.15.0</flink1.15.version> <flink.scala.version>2.11.12</flink.scala.version> <flink.scala.binary.version>2.11</flink.scala.binary.version> </properties> -</project> \ No newline at end of file + <profiles> + <profile> + <id>flink-115</id> + <!-- Flink 1.15 only support scala 2.12--> + <modules> + <module>flink-scala-2.12</module> + </modules> + </profile> + + <profile> + <id>flink-114</id> + <modules> + <module>flink-scala-2.11</module> + <module>flink-scala-2.12</module> + </modules> + </profile> + + <profile> + <id>flink-113</id> + <modules> + <module>flink-scala-2.11</module> + <module>flink-scala-2.12</module> + </modules> + </profile> + + <profile> + <id>flink-112</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <modules> + <module>flink-scala-2.11</module> + <module>flink-scala-2.12</module> + </modules> + </profile> + </profiles> + +</project> diff --git a/testing/env_python_3_with_flink_115.yml b/testing/env_python_3_with_flink_115.yml new file mode 100644 index 0000000000..a434029e4e --- /dev/null +++ b/testing/env_python_3_with_flink_115.yml @@ -0,0 +1,29 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - protobuf + - pandasql + - ipython + - ipython_genutils + - ipykernel + - jupyter_client=5 + - hvplot + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - jinja2=3.0.3 + - pip + - pip: + - apache-flink==1.15.0 + diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java new file mode 100644 index 0000000000..4fbf5a93c8 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +//@RunWith(value = Parameterized.class) +public class ZeppelinFlinkClusterTest115 extends ZeppelinFlinkClusterTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.15.0", "2.12"} + }); + } + + public ZeppelinFlinkClusterTest115(String flinkVersion, String scalaVersion) throws Exception { + super(flinkVersion, scalaVersion); + } +} diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java index 83a1f24b37..b7e5f83a72 100644 --- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java @@ -136,7 +136,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { private String chooseFlinkAppJar(String flinkHome) throws IOException { File flinkLibFolder = new File(flinkHome, "lib"); List<File> flinkDistFiles = - Arrays.stream(flinkLibFolder.listFiles(file -> file.getName().contains("flink-dist_"))) + Arrays.stream(flinkLibFolder.listFiles(file -> file.getName().contains("flink-dist"))) .collect(Collectors.toList()); if (flinkDistFiles.size() > 1) { throw new IOException("More than 1 flink-dist files: " + @@ -144,9 +144,15 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { .map(file -> file.getAbsolutePath()) .collect(Collectors.joining(","))); } - String scalaVersion = "2.11"; - if (flinkDistFiles.get(0).getName().contains("2.12")) { - scalaVersion = "2.12"; + if (flinkDistFiles.isEmpty()) { + throw new IOException(String.format("No flink-dist jar found under {0}", flinkHome + "/lib")); + } + + // scala 2.12 is the only support scala version starting from Flink 1.15, + // so use 2.12 as the default value + String scalaVersion = "2.12"; + if (flinkDistFiles.get(0).getName().contains("2.11")) { + scalaVersion = "2.11"; } final String flinkScalaVersion = scalaVersion; File flinkInterpreterFolder = @@ -156,8 +162,11 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { .listFiles(file -> file.getName().endsWith(".jar"))) .filter(file -> file.getName().contains(flinkScalaVersion)) .collect(Collectors.toList()); - if (flinkScalaJars.size() > 1) { - throw new IOException("More than 1 flink scala files: " + + + if (flinkScalaJars.isEmpty()) { + throw new IOException("No flink scala jar file is found"); + } else if (flinkScalaJars.size() > 1) { + throw new IOException("More than 1 flink scala jar files: " + flinkScalaJars.stream() .map(file -> file.getAbsolutePath()) .collect(Collectors.joining(",")));