This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 6de64ac [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes 6de64ac is described below commit 6de64ac202d1f0ac97df567208764e1547540c6e Author: godfreyhe <godfre...@163.com> AuthorDate: Fri Mar 13 13:38:03 2020 +0800 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes This closes #11397. --- .../org/apache/flink/table/client/cli/CliClient.java | 14 ++++++++++++-- .../table/client/gateway/local/LocalExecutor.java | 9 ++++++--- .../apache/flink/table/client/cli/CliClientTest.java | 18 +++--------------- .../flink/table/client/cli/utils/TerminalUtils.java | 5 ++++- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 797bca6..a0ec166 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -340,7 +340,12 @@ public class CliClient { } private void callReset() { - executor.resetSessionProperties(sessionId); + try { + executor.resetSessionProperties(sessionId); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } printInfo(CliStrings.MESSAGE_RESET); } @@ -367,7 +372,12 @@ public class CliClient { } // set a property else { - executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1]); + try { + executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); } terminal.flush(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3665735..b59c6bf 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -63,8 +63,6 @@ import org.apache.flink.types.Row; import org.apache.flink.util.JarUtils; import org.apache.flink.util.StringUtils; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; - import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,7 +280,12 @@ public class LocalExecutor implements Executor { public void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException { ExecutionContext<?> context = getExecutionContext(sessionId); Environment env = context.getEnvironment(); - Environment newEnv = Environment.enrich(env, ImmutableMap.of(key, value), ImmutableMap.of()); + Environment newEnv; + try { + newEnv = Environment.enrich(env, Collections.singletonMap(key, value), Collections.emptyMap()); + } catch (Throwable t) { + throw new SqlExecutionException("Could not set session property.", t); + } // Renew the ExecutionContext by new environment. // Book keep all the session states of current ExecutionContext then diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 3016724..aa2c3a1 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.client.cli; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.cli.utils.TerminalUtils; +import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.ViewEntry; import org.apache.flink.table.client.gateway.Executor; @@ -44,7 +45,6 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -103,17 +103,11 @@ public class CliClientTest extends TestLogger { Executor executor = mock(Executor.class); doThrow(new SqlExecutionException("mocked exception")).when(executor).useDatabase(any(), any()); InputStream inputStream = new ByteArrayInputStream("use db;\n".getBytes()); - // don't care about the output - OutputStream outputStream = new OutputStream() { - @Override - public void write(int b) throws IOException { - } - }; SessionContext session = new SessionContext("test-session", new Environment()); String sessionId = executor.openSession(session); CliClient cliClient = null; - try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { + try (Terminal terminal = new DumbTerminal(inputStream, new MockOutputStream())) { cliClient = new CliClient(terminal, sessionId, executor); cliClient.open(); verify(executor).useDatabase(any(), any()); @@ -129,17 +123,11 @@ public class CliClientTest extends TestLogger { Executor executor = mock(Executor.class); doThrow(new SqlExecutionException("mocked exception")).when(executor).useCatalog(any(), any()); InputStream inputStream = new ByteArrayInputStream("use catalog cat;\n".getBytes()); - // don't care about the output - OutputStream outputStream = new OutputStream() { - @Override - public void write(int b) throws IOException { - } - }; CliClient cliClient = null; SessionContext sessionContext = new SessionContext("test-session", new Environment()); String sessionId = executor.openSession(sessionContext); - try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { + try (Terminal terminal = new DumbTerminal(inputStream, new MockOutputStream())) { cliClient = new CliClient(terminal, sessionId, executor); cliClient.open(); verify(executor).useCatalog(any(), any()); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java index 772fb01..d2b851d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java @@ -50,7 +50,10 @@ public class TerminalUtils { } } - private static class MockOutputStream extends OutputStream { + /** + * A mock {@link OutputStream} for testing. + */ + public static class MockOutputStream extends OutputStream { @Override public void write(int b) {