[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

2023-04-02 Thread via GitHub


libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155415681


##
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##
@@ -18,34 +18,67 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-private final DriverUri driverUri;
+private final Executor executor;
+private volatile boolean closed = false;
 
 public FlinkConnection(DriverUri driverUri) {
-this.driverUri = driverUri;
+// TODO Support default context from map to get gid of flink core for 
jdbc driver

Review Comment:
   It would be great if you also add the JIra link in the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

2023-04-02 Thread via GitHub


libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155415681


##
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##
@@ -18,34 +18,67 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-private final DriverUri driverUri;
+private final Executor executor;
+private volatile boolean closed = false;
 
 public FlinkConnection(DriverUri driverUri) {
-this.driverUri = driverUri;
+// TODO Support default context from map to get gid of flink core for 
jdbc driver

Review Comment:
   It would be great if you also add the JIra link in the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

2023-03-31 Thread via GitHub


libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1154400863


##
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##
@@ -18,34 +18,64 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-private final DriverUri driverUri;
+private final Executor executor;
+private volatile boolean closed = false;
 
 public FlinkConnection(DriverUri driverUri) {
-this.driverUri = driverUri;
+this.executor =
+Executor.create(
+new DefaultContext(
+Configuration.fromMap(
+
DriverUtils.fromProperties(driverUri.getProperties())),
+Collections.emptyList()),
+driverUri.getAddress(),
+UUID.randomUUID().toString());
+driverUri.getCatalog().ifPresent(this::setSessionCatalog);
+driverUri.getDatabase().ifPresent(this::setSessionSchema);
 }
 
 @Override
 public Statement createStatement() throws SQLException {
 throw new SQLFeatureNotSupportedException();
 }
 
+Executor getExecutor() {

Review Comment:
   Add a `VisibleForTesting` annotation?



##
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SingleSessionManager;
+import org.apache.flink.table.client.gateway.StatementResult;
+import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkConnection}. */
+public class FlinkConnectionTest {
+@RegisterExtension
+@Order(1)
+private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(4)
+.build());
+
+@RegisterExtension
+@Order(2)
+public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
+new SqlGatewayServiceExtension(
+MINI_CLUSTER_RESOURCE::getClientConfiguration, 
SingleSessionManager::new);
+
+@RegisterExtension
+@Order(3)
+private static final SqlGatewayRestEndpointExtension 
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+new 
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+@Test
+public void testCatalogSchema() throws Exception {
+DriverUri driverUri =
+DriverUri.create(
+