This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 6de64ac  [FLINK-16217][sql-client] Fix exception catching to avoid SQL 
client crashes
6de64ac is described below

commit 6de64ac202d1f0ac97df567208764e1547540c6e
Author: godfreyhe <godfre...@163.com>
AuthorDate: Fri Mar 13 13:38:03 2020 +0800

    [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
    
    This closes #11397.
---
 .../org/apache/flink/table/client/cli/CliClient.java   | 14 ++++++++++++--
 .../table/client/gateway/local/LocalExecutor.java      |  9 ++++++---
 .../apache/flink/table/client/cli/CliClientTest.java   | 18 +++---------------
 .../flink/table/client/cli/utils/TerminalUtils.java    |  5 ++++-
 4 files changed, 25 insertions(+), 21 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 797bca6..a0ec166 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -340,7 +340,12 @@ public class CliClient {
        }
 
        private void callReset() {
-               executor.resetSessionProperties(sessionId);
+               try {
+                       executor.resetSessionProperties(sessionId);
+               } catch (SqlExecutionException e) {
+                       printExecutionException(e);
+                       return;
+               }
                printInfo(CliStrings.MESSAGE_RESET);
        }
 
@@ -367,7 +372,12 @@ public class CliClient {
                }
                // set a property
                else {
-                       executor.setSessionProperty(sessionId, 
cmdCall.operands[0], cmdCall.operands[1]);
+                       try {
+                               executor.setSessionProperty(sessionId, 
cmdCall.operands[0], cmdCall.operands[1].trim());
+                       } catch (SqlExecutionException e) {
+                               printExecutionException(e);
+                               return;
+                       }
                        
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
                }
                terminal.flush();
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3665735..b59c6bf 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -63,8 +63,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.JarUtils;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
-
 import org.apache.commons.cli.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -282,7 +280,12 @@ public class LocalExecutor implements Executor {
        public void setSessionProperty(String sessionId, String key, String 
value) throws SqlExecutionException {
                ExecutionContext<?> context = getExecutionContext(sessionId);
                Environment env = context.getEnvironment();
-               Environment newEnv = Environment.enrich(env, 
ImmutableMap.of(key, value), ImmutableMap.of());
+               Environment newEnv;
+               try {
+                       newEnv = Environment.enrich(env, 
Collections.singletonMap(key, value), Collections.emptyMap());
+               } catch (Throwable t) {
+                       throw new SqlExecutionException("Could not set session 
property.", t);
+               }
 
                // Renew the ExecutionContext by new environment.
                // Book keep all the session states of current ExecutionContext 
then
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 3016724..aa2c3a1 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.cli;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.cli.utils.TerminalUtils;
+import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
@@ -44,7 +45,6 @@ import org.junit.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -103,17 +103,11 @@ public class CliClientTest extends TestLogger {
                Executor executor = mock(Executor.class);
                doThrow(new SqlExecutionException("mocked 
exception")).when(executor).useDatabase(any(), any());
                InputStream inputStream = new ByteArrayInputStream("use 
db;\n".getBytes());
-               // don't care about the output
-               OutputStream outputStream = new OutputStream() {
-                       @Override
-                       public void write(int b) throws IOException {
-                       }
-               };
                SessionContext session = new SessionContext("test-session", new 
Environment());
                String sessionId = executor.openSession(session);
 
                CliClient cliClient = null;
-               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+               try (Terminal terminal = new DumbTerminal(inputStream, new 
MockOutputStream())) {
                        cliClient = new CliClient(terminal, sessionId, 
executor);
                        cliClient.open();
                        verify(executor).useDatabase(any(), any());
@@ -129,17 +123,11 @@ public class CliClientTest extends TestLogger {
                Executor executor = mock(Executor.class);
                doThrow(new SqlExecutionException("mocked 
exception")).when(executor).useCatalog(any(), any());
                InputStream inputStream = new ByteArrayInputStream("use catalog 
cat;\n".getBytes());
-               // don't care about the output
-               OutputStream outputStream = new OutputStream() {
-                       @Override
-                       public void write(int b) throws IOException {
-                       }
-               };
                CliClient cliClient = null;
                SessionContext sessionContext = new 
SessionContext("test-session", new Environment());
                String sessionId = executor.openSession(sessionContext);
 
-               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+               try (Terminal terminal = new DumbTerminal(inputStream, new 
MockOutputStream())) {
                        cliClient = new CliClient(terminal, sessionId, 
executor);
                        cliClient.open();
                        verify(executor).useCatalog(any(), any());
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
index 772fb01..d2b851d 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
@@ -50,7 +50,10 @@ public class TerminalUtils {
                }
        }
 
-       private static class MockOutputStream extends OutputStream {
+       /**
+        * A mock {@link OutputStream} for testing.
+        */
+       public static class MockOutputStream extends OutputStream {
 
                @Override
                public void write(int b) {

Reply via email to