This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c44d58c4c4 [FLINK-31545][jdbc-driver] Create executor in flink 
connection
8c44d58c4c4 is described below

commit 8c44d58c4c4aeb36c91d8b27f4128891970dc47d
Author: shammon FY <zjur...@gmail.com>
AuthorDate: Wed Mar 29 10:47:11 2023 +0800

    [FLINK-31545][jdbc-driver] Create executor in flink connection
    
    Close apache/flink#22289
---
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml   |  28 +++++
 flink-table/flink-sql-jdbc-driver/pom.xml          |  23 +++-
 .../org/apache/flink/table/jdbc/DriverUri.java     |  15 +--
 .../apache/flink/table/jdbc/FlinkConnection.java   |  96 +++++++++++++--
 .../flink/table/jdbc/FlinkConnectionTest.java      | 136 +++++++++++++++++++++
 .../apache/flink/table/jdbc/FlinkDriverTest.java   |   8 +-
 6 files changed, 279 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml 
b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
index 9ad99ffee9f..9d43bdb9be4 100644
--- a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
@@ -50,15 +50,40 @@
                        <artifactId>flink-sql-gateway-api</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-sql-gateway</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-common</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- Used to create Executor in connection. TODO jdbc driver 
should get rid of flink-core which is a big module. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
        </dependencies>
 
        <build>
                <plugins>
+                       <plugin>
+                               <groupId>io.github.zentol.japicmp</groupId>
+                               <artifactId>japicmp-maven-plugin</artifactId>
+                               <configuration>
+                                       <!-- TODO this should be removed after 
get rid of flink core in issue 
https://issues.apache.org/jira/browse/FLINK-31687. -->
+                                       <skip>true</skip>
+                               </configuration>
+                       </plugin>
                        <!-- Build flink-sql-gateway jar -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
@@ -77,7 +102,10 @@
                                                                        
<include>org.apache.flink:flink-sql-jdbc-driver</include>
                                                                        
<include>org.apache.flink:flink-sql-client</include>
                                                                        
<include>org.apache.flink:flink-sql-gateway-api</include>
+                                                                       
<include>org.apache.flink:flink-sql-gateway</include>
                                                                        
<include>org.apache.flink:flink-table-common</include>
+                                                                       
<include>org.apache.flink:flink-annotations</include>
+                                                                       
<include>org.apache.flink:flink-core</include>
                                                                </includes>
                                                        </artifactSet>
                                                </configuration>
diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml 
b/flink-table/flink-sql-jdbc-driver/pom.xml
index 267bca001ac..3d6968fa8e4 100644
--- a/flink-table/flink-sql-jdbc-driver/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver/pom.xml
@@ -48,9 +48,30 @@
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-api-java-bridge</artifactId>
+                       <artifactId>flink-sql-gateway</artifactId>
                        <version>${project.version}</version>
                </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-sql-gateway</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
index bb7952a2417..a13650b45e7 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.jdbc;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.sql.SQLException;
@@ -38,8 +39,7 @@ import static 
org.apache.flink.table.jdbc.utils.DriverUtils.isNullOrWhitespaceOn
 public class DriverUri {
     private static final String URL_PREFIX = "jdbc:";
     private static final String URL_START = URL_PREFIX + "flink:";
-    private final String host;
-    private final int port;
+    private final InetSocketAddress address;
     private final URI uri;
 
     private final Properties properties;
@@ -53,19 +53,14 @@ public class DriverUri {
 
     private DriverUri(URI uri, Properties driverProperties) throws 
SQLException {
         this.uri = checkNotNull(uri, "uri is null");
-        this.host = uri.getHost();
-        this.port = uri.getPort();
+        this.address = new InetSocketAddress(uri.getHost(), uri.getPort());
         this.properties = mergeDynamicProperties(uri, driverProperties);
 
         initCatalogAndSchema();
     }
 
-    public String getHost() {
-        return host;
-    }
-
-    public int getPort() {
-        return port;
+    public InetSocketAddress getAddress() {
+        return address;
     }
 
     public Properties getProperties() {
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 495e75fb83a..9af04bcd7ba 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -18,19 +18,40 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-    private final DriverUri driverUri;
+    private final Executor executor;
+    private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
-        this.driverUri = driverUri;
+        // TODO Support default context from map to get gid of flink core for 
jdbc driver in
+        // https://issues.apache.org/jira/browse/FLINK-31687.
+        this.executor =
+                Executor.create(
+                        new DefaultContext(
+                                Configuration.fromMap(
+                                        
DriverUtils.fromProperties(driverUri.getProperties())),
+                                Collections.emptyList()),
+                        driverUri.getAddress(),
+                        UUID.randomUUID().toString());
+        driverUri.getCatalog().ifPresent(this::setSessionCatalog);
+        driverUri.getDatabase().ifPresent(this::setSessionSchema);
     }
 
     @Override
@@ -38,14 +59,27 @@ public class FlinkConnection extends BaseConnection {
         throw new SQLFeatureNotSupportedException();
     }
 
+    @VisibleForTesting
+    Executor getExecutor() {
+        return this.executor;
+    }
+
     @Override
     public void close() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        if (closed) {
+            return;
+        }
+        try {
+            this.executor.close();
+        } catch (Exception e) {
+            throw new SQLException("Close connection fail", e);
+        }
+        this.closed = true;
     }
 
     @Override
     public boolean isClosed() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return closed;
     }
 
     @Override
@@ -55,41 +89,79 @@ public class FlinkConnection extends BaseConnection {
 
     @Override
     public void setCatalog(String catalog) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try {
+            setSessionCatalog(catalog);
+        } catch (Exception e) {
+            throw new SQLException(String.format("Set catalog[%s] fail", 
catalog), e);
+        }
+    }
+
+    private void setSessionCatalog(String catalog) {
+        executor.configureSession(String.format("USE CATALOG %s;", catalog));
     }
 
     @Override
     public String getCatalog() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try (StatementResult result = executor.executeStatement("SHOW CURRENT 
CATALOG;")) {
+            if (result.hasNext()) {
+                return result.next().getString(0).toString();
+            } else {
+                throw new SQLException("No catalog");
+            }
+        }
     }
 
     @Override
     public void setClientInfo(String name, String value) throws 
SQLClientInfoException {
-        throw new SQLClientInfoException();
+        executor.configureSession(String.format("SET '%s'='%s';", name, 
value));
     }
 
     @Override
     public void setClientInfo(Properties properties) throws 
SQLClientInfoException {
-        throw new SQLClientInfoException();
+        for (Object key : properties.keySet()) {
+            setClientInfo(key.toString(), 
properties.getProperty(key.toString()));
+        }
     }
 
     @Override
     public String getClientInfo(String name) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        // TODO Executor should return Map<String, String> here to get rid of 
flink core for jdbc
+        // driver in https://issues.apache.org/jira/browse/FLINK-31687.
+        Configuration configuration = (Configuration) 
executor.getSessionConfig();
+        return configuration.toMap().get(name);
     }
 
     @Override
     public Properties getClientInfo() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        Properties properties = new Properties();
+        // TODO Executor should return Map<String, String> here to get rid of 
flink core for jdbc
+        // driver in https://issues.apache.org/jira/browse/FLINK-31687.
+        Configuration configuration = (Configuration) 
executor.getSessionConfig();
+        configuration.toMap().forEach(properties::setProperty);
+        return properties;
     }
 
     @Override
     public void setSchema(String schema) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try {
+            setSessionSchema(schema);
+        } catch (Exception e) {
+            throw new SQLException(String.format("Set schema[%s] fail", 
schema), e);
+        }
+    }
+
+    private void setSessionSchema(String schema) {
+        executor.configureSession(String.format("USE %s;", schema));
     }
 
     @Override
     public String getSchema() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try (StatementResult result = executor.executeStatement("SHOW CURRENT 
DATABASE;")) {
+            if (result.hasNext()) {
+                return result.next().getString(0).toString();
+            } else {
+                throw new SQLException("No database");
+            }
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
new file mode 100644
index 00000000000..4b543f923ce
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SingleSessionManager;
+import org.apache.flink.table.client.gateway.StatementResult;
+import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkConnection}. */
+public class FlinkConnectionTest {
+    @RegisterExtension
+    @Order(1)
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(
+                    MINI_CLUSTER_RESOURCE::getClientConfiguration, 
SingleSessionManager::new);
+
+    @RegisterExtension
+    @Order(3)
+    private static final SqlGatewayRestEndpointExtension 
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+            new 
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    @Test
+    public void testCatalogSchema() throws Exception {
+        DriverUri driverUri =
+                DriverUri.create(
+                        String.format(
+                                "jdbc:flink://%s:%s",
+                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()),
+                        new Properties());
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());
+
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setCatalog("invalid_catalog"),
+                    "Set catalog[invalid_catalog] fail");
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setSchema("invalid_database"),
+                    "Set schema[invalid_database] fail");
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());
+
+            // Create new catalog and database
+            Executor executor = connection.getExecutor();
+            StatementResult result =
+                    executor.executeStatement(
+                            "CREATE CATALOG test_catalog WITH 
('type'='generic_in_memory');");
+            assertTrue(result.hasNext());
+            assertEquals("OK", result.next().getString(0).toString());
+            connection.setCatalog("test_catalog");
+
+            result = executor.executeStatement("CREATE DATABASE 
test_database;");
+            assertTrue(result.hasNext());
+            assertEquals("OK", result.next().getString(0).toString());
+            connection.setSchema("test_database");
+
+            assertEquals("test_catalog", connection.getCatalog());
+            assertEquals("test_database", connection.getSchema());
+        }
+    }
+
+    @Test
+    public void testClientInfo() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("key3", "val3");
+        DriverUri driverUri =
+                DriverUri.create(
+                        String.format(
+                                "jdbc:flink://%s:%s?key1=val1&key2=val2",
+                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()),
+                        properties);
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+            assertEquals("val1", connection.getClientInfo("key1"));
+            assertEquals("val2", connection.getClientInfo("key2"));
+            assertEquals("val3", connection.getClientInfo("key3"));
+
+            connection.setClientInfo("key1", "val11");
+            Properties resetProp = new Properties();
+            resetProp.setProperty("key2", "val22");
+            resetProp.setProperty("key3", "val33");
+            resetProp.setProperty("key4", "val44");
+            connection.setClientInfo(resetProp);
+            assertEquals("val11", connection.getClientInfo("key1"));
+            assertEquals("val44", connection.getClientInfo("key4"));
+            Properties clientInfo = connection.getClientInfo();
+            assertEquals("val11", clientInfo.getProperty("key1"));
+            assertEquals("val22", clientInfo.getProperty("key2"));
+            assertEquals("val33", clientInfo.getProperty("key3"));
+            assertEquals("val44", clientInfo.getProperty("key4"));
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java
index 46dfb8debd0..0735af6432a 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java
@@ -51,8 +51,8 @@ public class FlinkDriverTest {
         DriverUri driverUri = DriverUri.create(uri, properties);
         assertEquals("catalog_name", driverUri.getCatalog().get());
         assertEquals("database_name", driverUri.getDatabase().get());
-        assertEquals("localhost", driverUri.getHost());
-        assertEquals(8888, driverUri.getPort());
+        assertEquals("localhost", driverUri.getAddress().getHostName());
+        assertEquals(8888, driverUri.getAddress().getPort());
         assertEquals(5, driverUri.getProperties().size());
 
         String uriWithoutDBUri =
@@ -60,8 +60,8 @@ public class FlinkDriverTest {
         DriverUri driverWithoutDBUri = DriverUri.create(uriWithoutDBUri, 
properties);
         assertEquals("catalog_name", driverWithoutDBUri.getCatalog().get());
         assertFalse(driverWithoutDBUri.getDatabase().isPresent());
-        assertEquals("localhost", driverWithoutDBUri.getHost());
-        assertEquals(8888, driverWithoutDBUri.getPort());
+        assertEquals("localhost", 
driverWithoutDBUri.getAddress().getHostName());
+        assertEquals(8888, driverWithoutDBUri.getAddress().getPort());
         assertEquals(5, driverWithoutDBUri.getProperties().size());
     }
 

Reply via email to