libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1154400863
##
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##
@@ -18,34 +18,64 @@
package org.apache.flink.table.jdbc;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
import java.sql.DatabaseMetaData;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.Properties;
+import java.util.UUID;
/** Connection to flink sql gateway for jdbc driver. */
public class FlinkConnection extends BaseConnection {
-private final DriverUri driverUri;
+private final Executor executor;
+private volatile boolean closed = false;
public FlinkConnection(DriverUri driverUri) {
-this.driverUri = driverUri;
+this.executor =
+Executor.create(
+new DefaultContext(
+Configuration.fromMap(
+
DriverUtils.fromProperties(driverUri.getProperties())),
+Collections.emptyList()),
+driverUri.getAddress(),
+UUID.randomUUID().toString());
+driverUri.getCatalog().ifPresent(this::setSessionCatalog);
+driverUri.getDatabase().ifPresent(this::setSessionSchema);
}
@Override
public Statement createStatement() throws SQLException {
throw new SQLFeatureNotSupportedException();
}
+Executor getExecutor() {
Review Comment:
Add a `VisibleForTesting` annotation?
##
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SingleSessionManager;
+import org.apache.flink.table.client.gateway.StatementResult;
+import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkConnection}. */
+public class FlinkConnectionTest {
+@RegisterExtension
+@Order(1)
+private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(4)
+.build());
+
+@RegisterExtension
+@Order(2)
+public static final SqlGatewayServiceExtension
SQL_GATEWAY_SERVICE_EXTENSION =
+new SqlGatewayServiceExtension(
+MINI_CLUSTER_RESOURCE::getClientConfiguration,
SingleSessionManager::new);
+
+@RegisterExtension
+@Order(3)
+private static final SqlGatewayRestEndpointExtension
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+new
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+@Test
+public void testCatalogSchema() throws Exception {
+DriverUri driverUri =
+DriverUri.create(
+