This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 4b60f08 [ZEPPELIN-5668] Support statement set in flink (#4332) 4b60f08 is described below commit 4b60f08bd0ad241a7b197d7165abbb7c066ec979 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Mar 30 22:27:30 2022 +0800 [ZEPPELIN-5668] Support statement set in flink (#4332) --- .github/workflows/core.yml | 2 +- .../flink/FlinkStreamSqlInterpreterTest.java | 168 ++++++++++++++++++++- .../zeppelin/flink/Flink113SqlInterpreter.java | 21 +-- .../zeppelin/flink/Flink114SqlInterpreter.java | 21 +-- 4 files changed, 191 insertions(+), 21 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index d4d9b95..75cccdb 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -250,7 +250,7 @@ jobs: auto-activate-base: false use-mamba: true - name: run tests - run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }} + run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} spark-integration-test: runs-on: ubuntu-20.04 diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index 9894764..49b609d 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -20,12 +20,16 @@ package org.apache.zeppelin.flink; import net.jodah.concurrentunit.Waiter; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.junit.Test; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -38,11 +42,40 @@ import static org.junit.Assert.assertEquals; public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { + + private static class FlinkJobListener implements JobListener { + + private int jobCount = 0; + + public int getJobCount() { + return jobCount; + } + + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + jobCount ++; + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + } + + private FlinkJobListener flinkJobListener; + @Override protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) { return new FlinkStreamSqlInterpreter(properties); } + @Override + public void setUp() throws InterpreterException, IOException { + super.setUp(); + flinkJobListener = new FlinkJobListener(); + flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(flinkJobListener); + } + @Test public void testSingleStreamSql() throws IOException, InterpreterException { String initStreamScalaScript = getInitStreamScript(100); @@ -434,7 +467,65 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { } @Test - public void testMultipleInsertInto() throws InterpreterException, IOException { + public void testMultipleInsertIntoSeparately() throws InterpreterException, IOException { + hiveShell.execute("create table source_table (id int, name string)"); + hiveShell.execute("insert into source_table values(1, 'name')"); + + File destDir = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir); + InterpreterResult result = sqlInterpreter.interpret( + "CREATE TABLE dest_table (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + File destDir2 = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir2); + result = sqlInterpreter.interpret( + "CREATE TABLE dest_table2 (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + InterpreterContext context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "insert into dest_table select * from source_table;\n" + + "insert into dest_table2 select * from source_table", + context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // two flink jobs are executed + assertEquals(2, flinkJobListener.getJobCount()); + + // check dest_table + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select count(1) as c from dest_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("c\n1\n", context.out.toString()); + + // check dest_table2 + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("c\n1\n", context.out.toString()); + } + + @Test + public void testMultipleInsertIntoRunAsOne() throws InterpreterException, IOException { hiveShell.execute("create table source_table (id int, name string)"); hiveShell.execute("insert into source_table values(1, 'name')"); @@ -475,18 +566,22 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { "insert into dest_table select * from source_table;insert into dest_table2 select * from source_table", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // only one flink job is executed + assertEquals(1, flinkJobListener.getJobCount()); // check dest_table context = getInterpreterContext(); result = sqlInterpreter.interpret("select count(1) as c from dest_table", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("c\n1\n", context.out.toString()); + assertEquals(2, flinkJobListener.getJobCount()); // check dest_table2 context = getInterpreterContext(); result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("c\n1\n", context.out.toString()); + assertEquals(3, flinkJobListener.getJobCount()); // runAsOne won't affect the select statement. context = getInterpreterContext(); @@ -495,8 +590,77 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { "select 1 as a", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a\n1\n", context.out.toString()); + assertEquals(4, flinkJobListener.getJobCount()); + } + + @Test + public void testStatementSet() throws IOException, InterpreterException { + if (flinkInterpreter.getFlinkVersion().getMinorVersion() == 12) { + LOGGER.warn("Skip Flink 1.12 as statement set is not supported before 1.12"); + return; + } + hiveShell.execute("create table source_table (id int, name string)"); + hiveShell.execute("insert into source_table values(1, 'name')"); + + File destDir = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir); + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = sqlInterpreter.interpret( + "CREATE TABLE dest_table (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", context); + + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + + File destDir2 = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir2); + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "CREATE TABLE dest_table2 (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + + // insert into 2 sink tables in one statement set + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "begin statement set;\n" + + "insert into dest_table select * from source_table;\n" + + "insert into dest_table2 select * from source_table;\n" + + "end;", + context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + // only one flink job is executed + assertEquals(1, flinkJobListener.getJobCount()); + + // check dest_table + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select count(1) as c from dest_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("c\n1\n", context.out.toString()); + assertEquals(2, flinkJobListener.getJobCount()); + + // check dest_table2 + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("c\n1\n", context.out.toString()); + assertEquals(3, flinkJobListener.getJobCount()); } @Test diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java index e3fe741..fa01fac 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java @@ -164,8 +164,11 @@ public class Flink113SqlInterpreter { private ZeppelinContext z; private Parser sqlParser; private SqlSplitter sqlSplitter; - // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph. - private Map<String, Boolean> statementModeMap = new HashMap<>(); + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>(); private boolean isBatch; private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); @@ -204,7 +207,10 @@ public class Flink113SqlInterpreter { public InterpreterResult runSqlList(String st, InterpreterContext context) { try { boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - statementModeMap.put(context.getParagraphId(), runAsOne); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + String jobName = context.getLocalProperties().get("jobName"); if (StringUtils.isNotBlank(jobName)) { tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); @@ -259,7 +265,6 @@ public class Flink113SqlInterpreter { return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { statementOperationsMap.remove(context.getParagraphId()); - statementModeMap.remove(context.getParagraphId()); } return new InterpreterResult(InterpreterResult.Code.SUCCESS); @@ -345,10 +350,9 @@ public class Flink113SqlInterpreter { } private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException { - if (statementModeMap.getOrDefault(context.getParagraphId(), false)) { - List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); modifyOperations.add(operation); - statementOperationsMap.put(context.getParagraphId(), modifyOperations); } else { callInserts(Collections.singletonList(operation), context); } @@ -444,7 +448,7 @@ public class Flink113SqlInterpreter { } private void callBeginStatementSet(InterpreterContext context) throws IOException { - statementModeMap.put(context.getParagraphId(), true); + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); } private void callEndStatementSet(InterpreterContext context) throws IOException { @@ -454,7 +458,6 @@ public class Flink113SqlInterpreter { } else { context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); } - statementModeMap.remove(context.getParagraphId()); } private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java index f155fcc..eb0d684 100644 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java @@ -169,8 +169,11 @@ public class Flink114SqlInterpreter { private ZeppelinContext z; private Parser sqlParser; private SqlSplitter sqlSplitter; - // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph. - private Map<String, Boolean> statementModeMap = new HashMap<>(); + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>(); private boolean isBatch; private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); @@ -209,7 +212,10 @@ public class Flink114SqlInterpreter { public InterpreterResult runSqlList(String st, InterpreterContext context) { try { boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - statementModeMap.put(context.getParagraphId(), runAsOne); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + String jobName = context.getLocalProperties().get("jobName"); if (StringUtils.isNotBlank(jobName)) { tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); @@ -264,7 +270,6 @@ public class Flink114SqlInterpreter { return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { statementOperationsMap.remove(context.getParagraphId()); - statementModeMap.remove(context.getParagraphId()); } return new InterpreterResult(InterpreterResult.Code.SUCCESS); @@ -359,10 +364,9 @@ public class Flink114SqlInterpreter { } private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException { - if (statementModeMap.getOrDefault(context.getParagraphId(), false)) { - List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); modifyOperations.add(operation); - statementOperationsMap.put(context.getParagraphId(), modifyOperations); } else { callInserts(Collections.singletonList(operation), context); } @@ -472,7 +476,7 @@ public class Flink114SqlInterpreter { } private void callBeginStatementSet(InterpreterContext context) throws IOException { - statementModeMap.put(context.getParagraphId(), true); + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); } private void callEndStatementSet(InterpreterContext context) throws IOException { @@ -482,7 +486,6 @@ public class Flink114SqlInterpreter { } else { context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); } - statementModeMap.remove(context.getParagraphId()); } private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {