This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-31223-rest-ssl-fix in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43e1ee5061a2fe8660220930f31b11055801c97f Author: Weijie Guo <res...@163.com> AuthorDate: Sun Feb 26 23:13:45 2023 +0800 [FLINK-31223][test] Abstract SqlClientTestBase. --- .../apache/flink/table/client/SqlClientTest.java | 86 +-------------- .../flink/table/client/SqlClientTestBase.java | 119 +++++++++++++++++++++ .../rest/util/SqlGatewayRestEndpointExtension.java | 14 ++- 3 files changed, 134 insertions(+), 85 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index be1ac347dd9..87f5e449340 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -19,49 +19,33 @@ package org.apache.flink.table.client; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.table.client.cli.TerminalUtils; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; -import org.jline.terminal.Size; -import org.jline.terminal.Terminal; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; +import java.net.InetSocketAddress; import java.io.IOException; import java.io.OutputStream; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SqlClient}. */ -class SqlClientTest { - - @TempDir private Path tempFolder; +class SqlClientTest extends SqlClientTestBase { @RegisterExtension @Order(1) @@ -78,34 +62,6 @@ class SqlClientTest { private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); - private Map<String, String> originalEnv; - - private String historyPath; - - @BeforeEach - void before() throws IOException { - originalEnv = System.getenv(); - - // prepare conf dir - File confFolder = Files.createTempDirectory(tempFolder, "conf").toFile(); - File confYaml = new File(confFolder, "config.yaml"); - if (!confYaml.createNewFile()) { - throw new IOException("Can't create testing config.yaml file."); - } - - // adjust the test environment for the purposes of this test - Map<String, String> map = new HashMap<>(System.getenv()); - map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); - CommonTestUtils.setEnv(map); - - historyPath = Files.createTempFile(tempFolder, "history", "").toFile().getPath(); - } - - @AfterEach - void after() { - CommonTestUtils.setEnv(originalEnv); - } - @Test void testEmbeddedWithOptions() throws Exception { String[] args = new String[] {"embedded", "-hist", historyPath}; @@ -332,42 +288,4 @@ class SqlClientTest { .toURI()))); assertThat(runSqlClient(args)).isEqualTo(actual); } - - private String runSqlClient(String[] args) throws Exception { - return runSqlClient(args, "QUIT;\n", false); - } - - private String runSqlClient(String[] args, String statements, boolean printInput) - throws Exception { - try (OutputStream out = new ByteArrayOutputStream(); - Terminal terminal = - TerminalUtils.createDumbTerminal( - new ByteArrayInputStream( - statements.getBytes(StandardCharsets.UTF_8)), - out)) { - if (printInput) { - // The default terminal has an empty size. Here increase the terminal to allow - // the line reader print the input string. - terminal.setSize(new Size(160, 80)); - } - SqlClient.startClient(args, () -> terminal); - return out.toString().replace("\r\n", System.lineSeparator()); - } - } - - private String createSqlFile(List<String> statements, String name) throws IOException { - // create sql file - File sqlFileFolder = Files.createTempDirectory(tempFolder, "sql-file").toFile(); - File sqlFile = new File(sqlFileFolder, name); - if (!sqlFile.createNewFile()) { - throw new IOException(String.format("Can't create testing %s.", name)); - } - String sqlFilePath = sqlFile.getPath(); - Files.write( - Paths.get(sqlFilePath), - statements, - StandardCharsets.UTF_8, - StandardOpenOption.APPEND); - return sqlFilePath; - } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTestBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTestBase.java new file mode 100644 index 00000000000..d778524ecad --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTestBase.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.client.cli.TerminalUtils; + +import org.jline.terminal.Size; +import org.jline.terminal.Terminal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; + +/** Base class for test {@link SqlClient}. */ +class SqlClientTestBase { + @TempDir private Path tempFolder; + + protected String historyPath; + + protected Map<String, String> originalEnv; + + @BeforeEach + void before() throws IOException { + originalEnv = System.getenv(); + + // prepare conf dir + File confFolder = Files.createTempDirectory(tempFolder, "conf").toFile(); + File confYaml = new File(confFolder, "config.yaml"); + if (!confYaml.createNewFile()) { + throw new IOException("Can't create testing config.yaml file."); + } + writeConfigOptionsToConfYaml(confYaml.toPath()); + // adjust the test environment for the purposes of this test + Map<String, String> map = new HashMap<>(System.getenv()); + map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); + CommonTestUtils.setEnv(map); + + historyPath = Files.createTempFile(tempFolder, "history", "").toFile().getPath(); + } + + @AfterEach + void after() { + CommonTestUtils.setEnv(originalEnv); + } + + protected String createSqlFile(List<String> statements, String name) throws IOException { + // create sql file + File sqlFileFolder = Files.createTempDirectory(tempFolder, "sql-file").toFile(); + File sqlFile = new File(sqlFileFolder, name); + if (!sqlFile.createNewFile()) { + throw new IOException(String.format("Can't create testing %s.", name)); + } + String sqlFilePath = sqlFile.getPath(); + Files.write( + Paths.get(sqlFilePath), + statements, + StandardCharsets.UTF_8, + StandardOpenOption.APPEND); + return sqlFilePath; + } + + public static String runSqlClient(String[] args) throws Exception { + return runSqlClient(args, "QUIT;\n", false); + } + + public static String runSqlClient(String[] args, String statements, boolean printInput) + throws Exception { + try (OutputStream out = new ByteArrayOutputStream(); + Terminal terminal = + TerminalUtils.createDumbTerminal( + new ByteArrayInputStream( + statements.getBytes(StandardCharsets.UTF_8)), + out)) { + if (printInput) { + // The default terminal has an empty size. Here increase the terminal to allow + // the line reader print the input string. + terminal.setSize(new Size(160, 80)); + } + SqlClient.startClient(args, () -> terminal); + return out.toString().replace("\r\n", System.lineSeparator()); + } + } + + protected void writeConfigOptionsToConfYaml(Path confYamlPath) throws IOException { + // no-op for default. + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java index e694a0057b6..80a78068e9c 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig; @@ -41,6 +42,8 @@ public class SqlGatewayRestEndpointExtension implements BeforeAllCallback, After private final Supplier<SqlGatewayService> serviceSupplier; + private final Consumer<Configuration> flinkConfConsumer; + private SqlGatewayRestEndpoint sqlGatewayRestEndpoint; private SqlGatewayService sqlGatewayService; private String targetAddress; @@ -59,13 +62,22 @@ public class SqlGatewayRestEndpointExtension implements BeforeAllCallback, After } public SqlGatewayRestEndpointExtension(Supplier<SqlGatewayService> serviceSupplier) { + this(serviceSupplier, (conf) -> {}); + } + + public SqlGatewayRestEndpointExtension( + Supplier<SqlGatewayService> serviceSupplier, + Consumer<Configuration> flinkConfConsumer) { this.serviceSupplier = serviceSupplier; + this.flinkConfConsumer = flinkConfConsumer; } @Override public void beforeAll(ExtensionContext context) { String address = InetAddress.getLoopbackAddress().getHostAddress(); - Configuration config = getBaseConfig(getFlinkConfig(address, address, "0")); + Configuration flinkConfig = getFlinkConfig(address, address, "0"); + flinkConfConsumer.accept(flinkConfig); + Configuration config = getBaseConfig(flinkConfig); try { sqlGatewayService = serviceSupplier.get();