This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 4a5a720  [FLINK-15073][sql client] Sql client falis to run same query 
multiple times
4a5a720 is described below

commit 4a5a720024992c12bbfd4fb316d04f24d23a109e
Author: yuzhao.cyz <yuzhao....@gmail.com>
AuthorDate: Mon Dec 9 14:46:44 2019 +0800

    [FLINK-15073][sql client] Sql client falis to run same query multiple times
    
    After we change the SQL-CLI to stateful in FLINK-14672, each query's
    temporal table was left out so we can not re-registered the same
    object(from the same query).
    
    This closes #10523
    
    (cherry picked from commit 7bf96cf5fbd76377f5054c3b3f6552615a94c11d)
---
 .../table/client/gateway/local/LocalExecutor.java  |  12 +-
 .../client/gateway/local/LocalExecutorITCase.java  | 121 +++++++++++++++++++++
 2 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index c9f50d0..1174b09 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -619,16 +619,16 @@ public class LocalExecutor implements Executor {
                                removeTimeAttributes(table.getSchema()),
                                context.getExecutionConfig(),
                                context.getClassLoader());
-
                final String jobName = sessionId + ": " + query;
+               final String tableName = String.format("_tmp_table_%s", 
Math.abs(query.hashCode()));
                final Pipeline pipeline;
                try {
                        // writing to a sink requires an optimization step that 
might reference UDFs during code compilation
                        context.wrapClassLoader(() -> {
-                               
context.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
+                               
context.getTableEnvironment().registerTableSink(tableName, 
result.getTableSink());
                                table.insertInto(
                                                context.getQueryConfig(),
-                                               jobName);
+                                               tableName);
                                return null;
                        });
                        pipeline = context.createPipeline(jobName, 
context.getFlinkConfig());
@@ -638,6 +638,12 @@ public class LocalExecutor implements Executor {
                        result.close();
                        // catch everything such that the query does not crash 
the executor
                        throw new SqlExecutionException("Invalid SQL query.", 
t);
+               } finally {
+                       // Remove the temporal table object.
+                       context.wrapClassLoader(() -> {
+                               
context.getTableEnvironment().dropTemporaryTable(tableName);
+                               return null;
+                       });
                }
 
                // store the result with a unique id
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c7a19bf..10c797f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -450,6 +450,50 @@ public class LocalExecutorITCase extends TestLogger {
                }
        }
 
+       @Test(timeout = 90_000L)
+       public void testStreamQueryExecutionChangelogMultipleTimes() throws 
Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
+               replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+               replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+               replaceVars.put("$VAR_RESULT_MODE", "changelog");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+
+               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               assertEquals("test-session", sessionId);
+
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("(true,47,Hello World)");
+               expectedResults.add("(true,27,Hello World)");
+               expectedResults.add("(true,37,Hello World)");
+               expectedResults.add("(true,37,Hello World)");
+               expectedResults.add("(true,47,Hello World)");
+               expectedResults.add("(true,57,Hello World!!!!)");
+
+               try {
+                       for (int i = 0; i < 3; i++) {
+                               // start job and retrieval
+                               final ResultDescriptor desc = 
executor.executeQuery(
+                                               sessionId,
+                                               "SELECT 
scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
+
+                               assertFalse(desc.isMaterialized());
+
+                               final List<String> actualResults =
+                                               
retrieveChangelogResult(executor, sessionId, desc.getResultId());
+
+                               
TestBaseUtils.compareResultCollections(expectedResults, actualResults, 
Comparator.naturalOrder());
+                       }
+               } finally {
+                       executor.closeSession(sessionId);
+               }
+       }
+
        @Test(timeout = 30_000L)
        public void testStreamQueryExecutionTable() throws Exception {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
@@ -476,6 +520,43 @@ public class LocalExecutorITCase extends TestLogger {
                executeStreamQueryTable(replaceVars, query, expectedResults);
        }
 
+       @Test(timeout = 90_000L)
+       public void testStreamQueryExecutionTableMultipleTimes() throws 
Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
+               replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+               replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+               replaceVars.put("$VAR_RESULT_MODE", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+
+               final String query = "SELECT scalarUDF(IntegerField1), 
StringField1 FROM TableNumber1";
+
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("47,Hello World");
+               expectedResults.add("27,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("47,Hello World");
+               expectedResults.add("57,Hello World!!!!");
+
+               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               assertEquals("test-session", sessionId);
+
+               try {
+                       for (int i = 0; i < 3; i++) {
+                               executeStreamQueryTable(replaceVars, query, 
expectedResults);
+                       }
+               } finally {
+                       executor.closeSession(sessionId);
+               }
+       }
+
        @Test(timeout = 30_000L)
        public void testStreamQueryExecutionLimitedTable() throws Exception {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
@@ -535,6 +616,46 @@ public class LocalExecutorITCase extends TestLogger {
                }
        }
 
+       @Test(timeout = 90_000L)
+       public void testBatchQueryExecutionMultipleTimes() throws Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
+               replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+               replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+               replaceVars.put("$VAR_RESULT_MODE", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+
+               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               assertEquals("test-session", sessionId);
+
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("47");
+               expectedResults.add("27");
+               expectedResults.add("37");
+               expectedResults.add("37");
+               expectedResults.add("47");
+               expectedResults.add("57");
+
+               try {
+                       for (int i = 0; i < 3; i++) {
+                               final ResultDescriptor desc = 
executor.executeQuery(sessionId, "SELECT * FROM TestView1");
+
+                               assertTrue(desc.isMaterialized());
+
+                               final List<String> actualResults = 
retrieveTableResult(executor, sessionId, desc.getResultId());
+
+                               
TestBaseUtils.compareResultCollections(expectedResults, actualResults, 
Comparator.naturalOrder());
+                       }
+               } finally {
+                       executor.closeSession(sessionId);
+               }
+       }
+
        @Test(timeout = 30_000L)
        public void ensureExceptionOnFaultySourceInStreamingChangelogMode() 
throws Exception {
                final String missingFileName = "missing-source";

Reply via email to