[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection
FangYongs commented on code in PR #22289: URL: https://github.com/apache/flink/pull/22289#discussion_r1155438551 ## flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java: ## @@ -18,34 +18,67 @@ package org.apache.flink.table.jdbc; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.StatementResult; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.jdbc.utils.DriverUtils; + import java.sql.DatabaseMetaData; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; +import java.util.Collections; import java.util.Properties; +import java.util.UUID; /** Connection to flink sql gateway for jdbc driver. */ public class FlinkConnection extends BaseConnection { -private final DriverUri driverUri; +private final Executor executor; +private volatile boolean closed = false; public FlinkConnection(DriverUri driverUri) { -this.driverUri = driverUri; +// TODO Support default context from map to get gid of flink core for jdbc driver Review Comment: Thanks @libenchao , added the jira link -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection
FangYongs commented on code in PR #22289: URL: https://github.com/apache/flink/pull/22289#discussion_r1155231475 ## flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java: ## @@ -55,41 +85,75 @@ public DatabaseMetaData getMetaData() throws SQLException { @Override public void setCatalog(String catalog) throws SQLException { -throw new SQLFeatureNotSupportedException(); +try { +setSessionCatalog(catalog); +} catch (Exception e) { +throw new SQLException(String.format("Set catalog[%s] fail", catalog), e); +} +} + +private void setSessionCatalog(String catalog) { +executor.configureSession(String.format("USE CATALOG %s;", catalog)); } @Override public String getCatalog() throws SQLException { -throw new SQLFeatureNotSupportedException(); +try (StatementResult result = executor.executeStatement("SHOW CURRENT CATALOG;")) { +if (result.hasNext()) { +return result.next().getString(0).toString(); +} else { +throw new SQLException("No catalog"); +} +} } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { -throw new SQLClientInfoException(); +executor.configureSession(String.format("SET '%s'='%s';", name, value)); } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { -throw new SQLClientInfoException(); +for (Object key : properties.keySet()) { +setClientInfo(key.toString(), properties.getProperty(key.toString())); +} } @Override public String getClientInfo(String name) throws SQLException { -throw new SQLFeatureNotSupportedException(); +Configuration configuration = (Configuration) executor.getSessionConfig(); Review Comment: I agree with you and I think executor.getSessionConfig() should config without other dependencies such as `Map`. I will add `TODO` here and fix it in https://issues.apache.org/jira/browse/FLINK-31687 . What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection
FangYongs commented on code in PR #22289: URL: https://github.com/apache/flink/pull/22289#discussion_r1155231101 ## flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.SingleSessionManager; +import org.apache.flink.table.client.gateway.StatementResult; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.SQLException; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link FlinkConnection}. */ +public class FlinkConnectionTest { +@RegisterExtension +@Order(1) +private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(4) +.build()); + +@RegisterExtension +@Order(2) +public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = +new SqlGatewayServiceExtension( +MINI_CLUSTER_RESOURCE::getClientConfiguration, SingleSessionManager::new); + +@RegisterExtension +@Order(3) +private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = +new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + +@Test +public void testCatalogSchema() throws Exception { +DriverUri driverUri = +DriverUri.create( +String.format( +"jdbc:flink://%s:%s", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()), +new Properties()); +try (FlinkConnection connection = new FlinkConnection(driverUri)) { +assertEquals("default_catalog", connection.getCatalog()); +assertEquals("default_database", connection.getSchema()); + +assertThrowsExactly( +SQLException.class, +() -> connection.setCatalog("invalid_catalog"), +"Set catalog[invalid_catalog] fail"); +assertThrowsExactly( +SQLException.class, +() -> connection.setSchema("invalid_database"), +"Set schema[invalid_database] fail"); +assertEquals("default_catalog", connection.getCatalog()); +assertEquals("default_database", connection.getSchema()); Review Comment: These two lines want to check whether the catalog and database are changed by invalid catalog and database -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection
FangYongs commented on code in PR #22289: URL: https://github.com/apache/flink/pull/22289#discussion_r1155230946 ## flink-table/flink-sql-jdbc-driver-bundle/pom.xml: ## @@ -77,7 +89,9 @@ org.apache.flink:flink-sql-jdbc-driver org.apache.flink:flink-sql-client org.apache.flink:flink-sql-gateway-api + org.apache.flink:flink-sql-gateway org.apache.flink:flink-table-common + org.apache.flink:flink-core Review Comment: I have created an issue https://issues.apache.org/jira/browse/FLINK-31687 to get rid of flink-core after we complete main functions in jdbc driver -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org