This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 4a5a720 [FLINK-15073][sql client] Sql client falis to run same query multiple times 4a5a720 is described below commit 4a5a720024992c12bbfd4fb316d04f24d23a109e Author: yuzhao.cyz <yuzhao....@gmail.com> AuthorDate: Mon Dec 9 14:46:44 2019 +0800 [FLINK-15073][sql client] Sql client falis to run same query multiple times After we change the SQL-CLI to stateful in FLINK-14672, each query's temporal table was left out so we can not re-registered the same object(from the same query). This closes #10523 (cherry picked from commit 7bf96cf5fbd76377f5054c3b3f6552615a94c11d) --- .../table/client/gateway/local/LocalExecutor.java | 12 +- .../client/gateway/local/LocalExecutorITCase.java | 121 +++++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index c9f50d0..1174b09 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -619,16 +619,16 @@ public class LocalExecutor implements Executor { removeTimeAttributes(table.getSchema()), context.getExecutionConfig(), context.getClassLoader()); - final String jobName = sessionId + ": " + query; + final String tableName = String.format("_tmp_table_%s", Math.abs(query.hashCode())); final Pipeline pipeline; try { // writing to a sink requires an optimization step that might reference UDFs during code compilation context.wrapClassLoader(() -> { - context.getTableEnvironment().registerTableSink(jobName, result.getTableSink()); + context.getTableEnvironment().registerTableSink(tableName, result.getTableSink()); table.insertInto( context.getQueryConfig(), - jobName); + tableName); return null; }); pipeline = context.createPipeline(jobName, context.getFlinkConfig()); @@ -638,6 +638,12 @@ public class LocalExecutor implements Executor { result.close(); // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL query.", t); + } finally { + // Remove the temporal table object. + context.wrapClassLoader(() -> { + context.getTableEnvironment().dropTemporaryTable(tableName); + return null; + }); } // store the result with a unique id diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index c7a19bf..10c797f 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -450,6 +450,50 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test(timeout = 90_000L) + public void testStreamQueryExecutionChangelogMultipleTimes() throws Exception { + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", planner); + replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + + final Executor executor = createModifiedExecutor(clusterClient, replaceVars); + final SessionContext session = new SessionContext("test-session", new Environment()); + String sessionId = executor.openSession(session); + assertEquals("test-session", sessionId); + + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("(true,47,Hello World)"); + expectedResults.add("(true,27,Hello World)"); + expectedResults.add("(true,37,Hello World)"); + expectedResults.add("(true,37,Hello World)"); + expectedResults.add("(true,47,Hello World)"); + expectedResults.add("(true,57,Hello World!!!!)"); + + try { + for (int i = 0; i < 3; i++) { + // start job and retrieval + final ResultDescriptor desc = executor.executeQuery( + sessionId, + "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1"); + + assertFalse(desc.isMaterialized()); + + final List<String> actualResults = + retrieveChangelogResult(executor, sessionId, desc.getResultId()); + + TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); + } + } finally { + executor.closeSession(sessionId); + } + } + @Test(timeout = 30_000L) public void testStreamQueryExecutionTable() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); @@ -476,6 +520,43 @@ public class LocalExecutorITCase extends TestLogger { executeStreamQueryTable(replaceVars, query, expectedResults); } + @Test(timeout = 90_000L) + public void testStreamQueryExecutionTableMultipleTimes() throws Exception { + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", planner); + replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "table"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + + final String query = "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1"; + + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("47,Hello World"); + expectedResults.add("27,Hello World"); + expectedResults.add("37,Hello World"); + expectedResults.add("37,Hello World"); + expectedResults.add("47,Hello World"); + expectedResults.add("57,Hello World!!!!"); + + final Executor executor = createModifiedExecutor(clusterClient, replaceVars); + final SessionContext session = new SessionContext("test-session", new Environment()); + String sessionId = executor.openSession(session); + assertEquals("test-session", sessionId); + + try { + for (int i = 0; i < 3; i++) { + executeStreamQueryTable(replaceVars, query, expectedResults); + } + } finally { + executor.closeSession(sessionId); + } + } + @Test(timeout = 30_000L) public void testStreamQueryExecutionLimitedTable() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); @@ -535,6 +616,46 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test(timeout = 90_000L) + public void testBatchQueryExecutionMultipleTimes() throws Exception { + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", planner); + replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "batch"); + replaceVars.put("$VAR_RESULT_MODE", "table"); + replaceVars.put("$VAR_UPDATE_MODE", ""); + replaceVars.put("$VAR_MAX_ROWS", "100"); + + final Executor executor = createModifiedExecutor(clusterClient, replaceVars); + final SessionContext session = new SessionContext("test-session", new Environment()); + String sessionId = executor.openSession(session); + assertEquals("test-session", sessionId); + + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("47"); + expectedResults.add("27"); + expectedResults.add("37"); + expectedResults.add("37"); + expectedResults.add("47"); + expectedResults.add("57"); + + try { + for (int i = 0; i < 3; i++) { + final ResultDescriptor desc = executor.executeQuery(sessionId, "SELECT * FROM TestView1"); + + assertTrue(desc.isMaterialized()); + + final List<String> actualResults = retrieveTableResult(executor, sessionId, desc.getResultId()); + + TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); + } + } finally { + executor.closeSession(sessionId); + } + } + @Test(timeout = 30_000L) public void ensureExceptionOnFaultySourceInStreamingChangelogMode() throws Exception { final String missingFileName = "missing-source";