This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8c44d58c4c4 [FLINK-31545][jdbc-driver] Create executor in flink connection 8c44d58c4c4 is described below commit 8c44d58c4c4aeb36c91d8b27f4128891970dc47d Author: shammon FY <zjur...@gmail.com> AuthorDate: Wed Mar 29 10:47:11 2023 +0800 [FLINK-31545][jdbc-driver] Create executor in flink connection Close apache/flink#22289 --- flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 28 +++++ flink-table/flink-sql-jdbc-driver/pom.xml | 23 +++- .../org/apache/flink/table/jdbc/DriverUri.java | 15 +-- .../apache/flink/table/jdbc/FlinkConnection.java | 96 +++++++++++++-- .../flink/table/jdbc/FlinkConnectionTest.java | 136 +++++++++++++++++++++ .../apache/flink/table/jdbc/FlinkDriverTest.java | 8 +- 6 files changed, 279 insertions(+), 27 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml index 9ad99ffee9f..9d43bdb9be4 100644 --- a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml +++ b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml @@ -50,15 +50,40 @@ <artifactId>flink-sql-gateway-api</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-gateway</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-annotations</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Used to create Executor in connection. TODO jdbc driver should get rid of flink-core which is a big module. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>io.github.zentol.japicmp</groupId> + <artifactId>japicmp-maven-plugin</artifactId> + <configuration> + <!-- TODO this should be removed after get rid of flink core in issue https://issues.apache.org/jira/browse/FLINK-31687. --> + <skip>true</skip> + </configuration> + </plugin> <!-- Build flink-sql-gateway jar --> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -77,7 +102,10 @@ <include>org.apache.flink:flink-sql-jdbc-driver</include> <include>org.apache.flink:flink-sql-client</include> <include>org.apache.flink:flink-sql-gateway-api</include> + <include>org.apache.flink:flink-sql-gateway</include> <include>org.apache.flink:flink-table-common</include> + <include>org.apache.flink:flink-annotations</include> + <include>org.apache.flink:flink-core</include> </includes> </artifactSet> </configuration> diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml b/flink-table/flink-sql-jdbc-driver/pom.xml index 267bca001ac..3d6968fa8e4 100644 --- a/flink-table/flink-sql-jdbc-driver/pom.xml +++ b/flink-table/flink-sql-jdbc-driver/pom.xml @@ -48,9 +48,30 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge</artifactId> + <artifactId>flink-sql-gateway</artifactId> <version>${project.version}</version> </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-gateway</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java index bb7952a2417..a13650b45e7 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java @@ -18,6 +18,7 @@ package org.apache.flink.table.jdbc; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.sql.SQLException; @@ -38,8 +39,7 @@ import static org.apache.flink.table.jdbc.utils.DriverUtils.isNullOrWhitespaceOn public class DriverUri { private static final String URL_PREFIX = "jdbc:"; private static final String URL_START = URL_PREFIX + "flink:"; - private final String host; - private final int port; + private final InetSocketAddress address; private final URI uri; private final Properties properties; @@ -53,19 +53,14 @@ public class DriverUri { private DriverUri(URI uri, Properties driverProperties) throws SQLException { this.uri = checkNotNull(uri, "uri is null"); - this.host = uri.getHost(); - this.port = uri.getPort(); + this.address = new InetSocketAddress(uri.getHost(), uri.getPort()); this.properties = mergeDynamicProperties(uri, driverProperties); initCatalogAndSchema(); } - public String getHost() { - return host; - } - - public int getPort() { - return port; + public InetSocketAddress getAddress() { + return address; } public Properties getProperties() { diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java index 495e75fb83a..9af04bcd7ba 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java @@ -18,19 +18,40 @@ package org.apache.flink.table.jdbc; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.StatementResult; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.jdbc.utils.DriverUtils; + import java.sql.DatabaseMetaData; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; +import java.util.Collections; import java.util.Properties; +import java.util.UUID; /** Connection to flink sql gateway for jdbc driver. */ public class FlinkConnection extends BaseConnection { - private final DriverUri driverUri; + private final Executor executor; + private volatile boolean closed = false; public FlinkConnection(DriverUri driverUri) { - this.driverUri = driverUri; + // TODO Support default context from map to get gid of flink core for jdbc driver in + // https://issues.apache.org/jira/browse/FLINK-31687. + this.executor = + Executor.create( + new DefaultContext( + Configuration.fromMap( + DriverUtils.fromProperties(driverUri.getProperties())), + Collections.emptyList()), + driverUri.getAddress(), + UUID.randomUUID().toString()); + driverUri.getCatalog().ifPresent(this::setSessionCatalog); + driverUri.getDatabase().ifPresent(this::setSessionSchema); } @Override @@ -38,14 +59,27 @@ public class FlinkConnection extends BaseConnection { throw new SQLFeatureNotSupportedException(); } + @VisibleForTesting + Executor getExecutor() { + return this.executor; + } + @Override public void close() throws SQLException { - throw new SQLFeatureNotSupportedException(); + if (closed) { + return; + } + try { + this.executor.close(); + } catch (Exception e) { + throw new SQLException("Close connection fail", e); + } + this.closed = true; } @Override public boolean isClosed() throws SQLException { - throw new SQLFeatureNotSupportedException(); + return closed; } @Override @@ -55,41 +89,79 @@ public class FlinkConnection extends BaseConnection { @Override public void setCatalog(String catalog) throws SQLException { - throw new SQLFeatureNotSupportedException(); + try { + setSessionCatalog(catalog); + } catch (Exception e) { + throw new SQLException(String.format("Set catalog[%s] fail", catalog), e); + } + } + + private void setSessionCatalog(String catalog) { + executor.configureSession(String.format("USE CATALOG %s;", catalog)); } @Override public String getCatalog() throws SQLException { - throw new SQLFeatureNotSupportedException(); + try (StatementResult result = executor.executeStatement("SHOW CURRENT CATALOG;")) { + if (result.hasNext()) { + return result.next().getString(0).toString(); + } else { + throw new SQLException("No catalog"); + } + } } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { - throw new SQLClientInfoException(); + executor.configureSession(String.format("SET '%s'='%s';", name, value)); } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { - throw new SQLClientInfoException(); + for (Object key : properties.keySet()) { + setClientInfo(key.toString(), properties.getProperty(key.toString())); + } } @Override public String getClientInfo(String name) throws SQLException { - throw new SQLFeatureNotSupportedException(); + // TODO Executor should return Map<String, String> here to get rid of flink core for jdbc + // driver in https://issues.apache.org/jira/browse/FLINK-31687. + Configuration configuration = (Configuration) executor.getSessionConfig(); + return configuration.toMap().get(name); } @Override public Properties getClientInfo() throws SQLException { - throw new SQLFeatureNotSupportedException(); + Properties properties = new Properties(); + // TODO Executor should return Map<String, String> here to get rid of flink core for jdbc + // driver in https://issues.apache.org/jira/browse/FLINK-31687. + Configuration configuration = (Configuration) executor.getSessionConfig(); + configuration.toMap().forEach(properties::setProperty); + return properties; } @Override public void setSchema(String schema) throws SQLException { - throw new SQLFeatureNotSupportedException(); + try { + setSessionSchema(schema); + } catch (Exception e) { + throw new SQLException(String.format("Set schema[%s] fail", schema), e); + } + } + + private void setSessionSchema(String schema) { + executor.configureSession(String.format("USE %s;", schema)); } @Override public String getSchema() throws SQLException { - throw new SQLFeatureNotSupportedException(); + try (StatementResult result = executor.executeStatement("SHOW CURRENT DATABASE;")) { + if (result.hasNext()) { + return result.next().getString(0).toString(); + } else { + throw new SQLException("No database"); + } + } } } diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java new file mode 100644 index 00000000000..4b543f923ce --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.SingleSessionManager; +import org.apache.flink.table.client.gateway.StatementResult; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.SQLException; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link FlinkConnection}. */ +public class FlinkConnectionTest { + @RegisterExtension + @Order(1) + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + + @RegisterExtension + @Order(2) + public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension( + MINI_CLUSTER_RESOURCE::getClientConfiguration, SingleSessionManager::new); + + @RegisterExtension + @Order(3) + private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = + new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + + @Test + public void testCatalogSchema() throws Exception { + DriverUri driverUri = + DriverUri.create( + String.format( + "jdbc:flink://%s:%s", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()), + new Properties()); + try (FlinkConnection connection = new FlinkConnection(driverUri)) { + assertEquals("default_catalog", connection.getCatalog()); + assertEquals("default_database", connection.getSchema()); + + assertThrowsExactly( + SQLException.class, + () -> connection.setCatalog("invalid_catalog"), + "Set catalog[invalid_catalog] fail"); + assertThrowsExactly( + SQLException.class, + () -> connection.setSchema("invalid_database"), + "Set schema[invalid_database] fail"); + assertEquals("default_catalog", connection.getCatalog()); + assertEquals("default_database", connection.getSchema()); + + // Create new catalog and database + Executor executor = connection.getExecutor(); + StatementResult result = + executor.executeStatement( + "CREATE CATALOG test_catalog WITH ('type'='generic_in_memory');"); + assertTrue(result.hasNext()); + assertEquals("OK", result.next().getString(0).toString()); + connection.setCatalog("test_catalog"); + + result = executor.executeStatement("CREATE DATABASE test_database;"); + assertTrue(result.hasNext()); + assertEquals("OK", result.next().getString(0).toString()); + connection.setSchema("test_database"); + + assertEquals("test_catalog", connection.getCatalog()); + assertEquals("test_database", connection.getSchema()); + } + } + + @Test + public void testClientInfo() throws Exception { + Properties properties = new Properties(); + properties.setProperty("key3", "val3"); + DriverUri driverUri = + DriverUri.create( + String.format( + "jdbc:flink://%s:%s?key1=val1&key2=val2", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()), + properties); + try (FlinkConnection connection = new FlinkConnection(driverUri)) { + assertEquals("val1", connection.getClientInfo("key1")); + assertEquals("val2", connection.getClientInfo("key2")); + assertEquals("val3", connection.getClientInfo("key3")); + + connection.setClientInfo("key1", "val11"); + Properties resetProp = new Properties(); + resetProp.setProperty("key2", "val22"); + resetProp.setProperty("key3", "val33"); + resetProp.setProperty("key4", "val44"); + connection.setClientInfo(resetProp); + assertEquals("val11", connection.getClientInfo("key1")); + assertEquals("val44", connection.getClientInfo("key4")); + Properties clientInfo = connection.getClientInfo(); + assertEquals("val11", clientInfo.getProperty("key1")); + assertEquals("val22", clientInfo.getProperty("key2")); + assertEquals("val33", clientInfo.getProperty("key3")); + assertEquals("val44", clientInfo.getProperty("key4")); + } + } +} diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java index 46dfb8debd0..0735af6432a 100644 --- a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java @@ -51,8 +51,8 @@ public class FlinkDriverTest { DriverUri driverUri = DriverUri.create(uri, properties); assertEquals("catalog_name", driverUri.getCatalog().get()); assertEquals("database_name", driverUri.getDatabase().get()); - assertEquals("localhost", driverUri.getHost()); - assertEquals(8888, driverUri.getPort()); + assertEquals("localhost", driverUri.getAddress().getHostName()); + assertEquals(8888, driverUri.getAddress().getPort()); assertEquals(5, driverUri.getProperties().size()); String uriWithoutDBUri = @@ -60,8 +60,8 @@ public class FlinkDriverTest { DriverUri driverWithoutDBUri = DriverUri.create(uriWithoutDBUri, properties); assertEquals("catalog_name", driverWithoutDBUri.getCatalog().get()); assertFalse(driverWithoutDBUri.getDatabase().isPresent()); - assertEquals("localhost", driverWithoutDBUri.getHost()); - assertEquals(8888, driverWithoutDBUri.getPort()); + assertEquals("localhost", driverWithoutDBUri.getAddress().getHostName()); + assertEquals(8888, driverWithoutDBUri.getAddress().getPort()); assertEquals(5, driverWithoutDBUri.getProperties().size()); }