Copilot commented on code in PR #6464: URL: https://github.com/apache/hive/pull/6464#discussion_r3193996788
########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.google.common.annotations.VisibleForTesting; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo; +import org.slf4j.LoggerFactory; + +import static java.sql.DriverManager.registerDriver; + +public class MetastoreDriver implements Driver { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MetastoreDriver.class); + private static final String URL_PREFIX = "jdbc:metastore://"; + private static int majorVersion = -1; + private static int minorVersion = -1; + private static final Configuration configuration; + private static final Driver delegateDriver; + private static final String jdbcUrl; + static { + try { + registerDriver(new MetastoreDriver()); + String versionString = MetastoreVersionInfo.getVersion(); + String[] versionNums = versionString.split("\\."); + if (NumberUtils.isNumber(versionNums[0])) { + majorVersion = Integer.parseInt(versionNums[0]); + } + if (versionNums.length >1 && NumberUtils.isNumber(versionNums[1])) { + minorVersion = Integer.parseInt(versionNums[1]); + } + configuration = MetastoreConf.newMetastoreConf(); + jdbcUrl = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY); + delegateDriver = findRegisteredDriver(configuration); + } catch (Exception e) { + throw new RuntimeException("Failed to register Metastore driver", e); + } + } + + private static Driver findRegisteredDriver(Configuration configuration) throws SQLException { + List<Driver> candidates = new ArrayList<>(); + for (Enumeration<Driver> drivers = DriverManager.getDrivers(); drivers.hasMoreElements();) { + Driver driver = drivers.nextElement(); + try { + if (driver.acceptsURL(jdbcUrl)) { + candidates.add(driver); + } + } catch (Exception e) { + } + } + + String driverClassName = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECTION_DRIVER); + if (candidates.isEmpty()) { + Class<Driver> driverClz = tryLoadDriver(driverClassName, Thread.currentThread().getContextClassLoader(), + MetastoreDriver.class.getClassLoader()); + if (driverClz != null) { + try { + Driver driver = driverClz.getDeclaredConstructor().newInstance(); + if (!driver.acceptsURL(jdbcUrl)) { + throw new Error("Driver " + driverClassName + " cannot accept jdbcUrl"); + } + candidates.add(driver); + } catch (Exception e) { + LOG.warn("Failed to create instance of driver class {}", driverClassName, e); + } + } + } + return candidates.isEmpty() ? DriverManager.getDriver(jdbcUrl) : candidates.getFirst(); + } + + private static Class<Driver> tryLoadDriver(String driverClassName, ClassLoader... loaders) { + for (ClassLoader loader : loaders) { + if (loader != null) { + try { + return (Class<Driver>) loader.loadClass(driverClassName); + } catch (ClassNotFoundException e) { + LOG.debug("Driver class {} not found in class loader {}", driverClassName, loader); + } + } + } + return null; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return new MetastoreConnection(delegateDriver.connect(jdbcUrl, info), configuration); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return Pattern.matches(URL_PREFIX + ".*", url); Review Comment: MetastoreDriver.connect() ignores the incoming JDBC URL and always delegates to the static jdbcUrl, and it never returns null for non-matching URLs. JDBC Driver implementations are expected to return null when they don’t accept the provided URL; otherwise DriverManager may select this driver for unrelated JDBC URLs and connect to the wrong backend. Consider checking acceptsURL(url) (or parsing the delegate URL from the wrapper URL/properties) and returning null when it doesn’t match, and avoid using a single static jdbcUrl for all connections. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.google.common.annotations.VisibleForTesting; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo; +import org.slf4j.LoggerFactory; + +import static java.sql.DriverManager.registerDriver; + +public class MetastoreDriver implements Driver { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MetastoreDriver.class); + private static final String URL_PREFIX = "jdbc:metastore://"; + private static int majorVersion = -1; + private static int minorVersion = -1; + private static final Configuration configuration; + private static final Driver delegateDriver; + private static final String jdbcUrl; + static { + try { + registerDriver(new MetastoreDriver()); + String versionString = MetastoreVersionInfo.getVersion(); + String[] versionNums = versionString.split("\\."); + if (NumberUtils.isNumber(versionNums[0])) { + majorVersion = Integer.parseInt(versionNums[0]); + } + if (versionNums.length >1 && NumberUtils.isNumber(versionNums[1])) { + minorVersion = Integer.parseInt(versionNums[1]); + } + configuration = MetastoreConf.newMetastoreConf(); + jdbcUrl = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY); + delegateDriver = findRegisteredDriver(configuration); + } catch (Exception e) { + throw new RuntimeException("Failed to register Metastore driver", e); + } + } + + private static Driver findRegisteredDriver(Configuration configuration) throws SQLException { + List<Driver> candidates = new ArrayList<>(); + for (Enumeration<Driver> drivers = DriverManager.getDrivers(); drivers.hasMoreElements();) { + Driver driver = drivers.nextElement(); + try { + if (driver.acceptsURL(jdbcUrl)) { + candidates.add(driver); + } + } catch (Exception e) { + } + } + + String driverClassName = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECTION_DRIVER); + if (candidates.isEmpty()) { + Class<Driver> driverClz = tryLoadDriver(driverClassName, Thread.currentThread().getContextClassLoader(), + MetastoreDriver.class.getClassLoader()); + if (driverClz != null) { + try { + Driver driver = driverClz.getDeclaredConstructor().newInstance(); + if (!driver.acceptsURL(jdbcUrl)) { + throw new Error("Driver " + driverClassName + " cannot accept jdbcUrl"); + } + candidates.add(driver); + } catch (Exception e) { + LOG.warn("Failed to create instance of driver class {}", driverClassName, e); + } + } + } + return candidates.isEmpty() ? DriverManager.getDriver(jdbcUrl) : candidates.getFirst(); + } + + private static Class<Driver> tryLoadDriver(String driverClassName, ClassLoader... loaders) { + for (ClassLoader loader : loaders) { + if (loader != null) { + try { + return (Class<Driver>) loader.loadClass(driverClassName); + } catch (ClassNotFoundException e) { + LOG.debug("Driver class {} not found in class loader {}", driverClassName, loader); + } + } + } + return null; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return new MetastoreConnection(delegateDriver.connect(jdbcUrl, info), configuration); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return Pattern.matches(URL_PREFIX + ".*", url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + // An empty array if no properties are required. + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return majorVersion; + } + + @Override + public int getMinorVersion() { + return minorVersion; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return delegateDriver.getParentLogger(); + } + + public static String getMetastoreDbUrl(Configuration configuration) { + return MetastoreDriver.URL_PREFIX + "internal-delegate-url"; Review Comment: getMetastoreDbUrl(Configuration) ignores its configuration parameter and always returns a constant placeholder URL. This makes it impossible to encode/derive the delegate JDBC URL on a per-Configuration basis, and reinforces the static-jdbcUrl issue in connect(). Consider either embedding the actual delegate URL (or an identifier) into the wrapper URL, or placing it into connection Properties so MetastoreDriver.connect() can use the caller’s intended target. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreStatement.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.sql.Statement; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HMSHandlerContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.logSlowExecution; +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.isSlowExecution; + +@SuppressWarnings("unchecked") +public final class MetastoreStatement implements InvocationHandler { + private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatement.class); + static final String EXEC_HOOK = "metastore.jdbc.execution.hook"; + static final ThreadLocal<Pair<Pair<String, Long>, LongAdder>> CURRENT_CALL = new ThreadLocal<>(); + + private final String rawSql; + private final Statement delegate; + private final Configuration configuration; + private final MetastoreStatementHook hook; + + private MetastoreStatement(Configuration conf, Statement statement, String rawSql) { + this.configuration = Objects.requireNonNull(conf); + this.rawSql = rawSql; + this.delegate = Objects.requireNonNull(statement); + String className = conf.get(EXEC_HOOK, ""); + if (StringUtils.isEmpty(className)) { + hook = new JdbcProfilerUtils(conf); + } else { + try { + hook = JavaUtils.newInstance(JavaUtils.getClass(className.trim(), MetastoreStatementHook.class), + new Class[] { Configuration.class}, new Object[] {conf}); + } catch (MetaException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + public static <T extends Statement> T getProxyStatement(Configuration configuration, T delegate, String rawSql) { + MetastoreStatement handler = new MetastoreStatement(configuration, delegate, rawSql); + return (T) Proxy.newProxyInstance(JavaUtils.getClassLoader(), + ClassUtils.getAllInterfaces(delegate.getClass()).toArray(new Class[0]), handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Timer.Context ctx = null; + try { + LongAdder adder = null; + boolean shouldMonitor = hook.profile(rawSql, method, args); + if (Metrics.getRegistry() != null && shouldMonitor) { + Optional<Pair<String, Long>> ctxCall = HMSHandlerContext.getCallId(); + Pair<Pair<String, Long>, LongAdder> previousCall = CURRENT_CALL.get(); + if (ctxCall.isPresent()) { + if ((previousCall == null || !ctxCall.get().equals(previousCall.getLeft()))) { + // we approach the end of previous thrift call + if (previousCall != null) { + Pair<String, Long> call = previousCall.getLeft(); + long totalSpent = previousCall.getRight().longValue(); + LOG.debug("{} took {} ms to complete all jdbc queries", call.getLeft(), totalSpent); + if (isSlowExecution(configuration, totalSpent)) { + LOG.info("{} took {} ms to complete all jdbc queries", call.getLeft(), totalSpent); + } + } + adder = new LongAdder(); + CURRENT_CALL.set(Pair.of(ctxCall.get(), adder)); + } else { + adder = previousCall.getRight(); + } + } + String metricName = hook.getMetricName(method, args); + Timer timer = Metrics.getOrCreateTimer(metricName); + if (timer != null) { + ctx = timer.time(); + } + } + long start = System.currentTimeMillis(); + hook.preRun(method, args); + Object result = method.invoke(delegate, args); + hook.postRun(method, args, result); + long timeSpent = System.currentTimeMillis() - start; + if (shouldMonitor) { + String statement = rawSql != null ? rawSql : (args != null && args.length > 0 ? (String) args[0] : "no sql found"); + LOG.debug("SQL query: {} completed in {} ms", statement, timeSpent); + } + logSlowExecution(timeSpent, configuration, rawSql, method, args); + if (adder != null) { + adder.add(timeSpent); + } + return result; + } catch (InvocationTargetException | UndeclaredThrowableException e) { + throw e.getCause(); + } finally { + if (ctx != null) { + ctx.stop(); + } + } + } + + public interface MetastoreStatementHook { + /** + * Whether should monitor the current call, this method gives a chance to profile a specific pattern of queries. + * For example, we use {@link JdbcProfilerUtils} to profile the queries originated from a set of specific APIs. + * @param sql The sql being executed, it might be null for {@link Statement#execute}, for this case + * need to obtain the sql from args, the method input. + * @param method Method which is being called + * @param args The method input + * @return true for profiling this call, false otherwise + */ + boolean profile(String sql, Method method, Object[] args); + + String getMetricName(Method method, Object[] args); + /** + * Invoked before the method call + * @param method Method which is being called + * @param args The method input + */ + default void preRun(Method method, Object[] args) { + + } + + /** + * Invoked post the method call + * @param method Method which is being called + * @param args The method input + * @param result The execution result from the call + */ + default void postRun(Method method, Object[] args, Object result) { + + } + } + + /** + * This class is used to profile the underlying statement originated from specific thrift API calls + */ + public static class JdbcProfilerUtils implements MetastoreStatement.MetastoreStatementHook { + private static final Set<String> PROFILED_APIS = new HashSet<>(); + static final Set<String> QUERY_EXECUTION = + Set.of("executeQuery", "executeUpdate", "execute", "executeBatch"); + private static volatile boolean initialized = false; + private static long logSlowQueriesThreshold; + + public JdbcProfilerUtils(Configuration configuration) { + initialize(Objects.requireNonNull(configuration)); + } + + private static void initialize(Configuration configuration) { + if (!initialized) { + synchronized (JdbcProfilerUtils.class) { + if (!initialized) { + initialized = true; + logSlowQueriesThreshold = MetastoreConf.getLongVar(configuration, + MetastoreConf.ConfVars.METASTORE_JDBC_SLOW_QUERIES); + if (logSlowQueriesThreshold > 0) { + LOG.info("The slow query log enabled, will log the query that takes more than {} ms", + logSlowQueriesThreshold); + } + String thriftApis = MetastoreConf.getVar(configuration, + MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_THRIFT_APIS); + if (StringUtils.isNotEmpty(thriftApis)) { + PROFILED_APIS.addAll(Arrays.asList(thriftApis.split(","))); Review Comment: The profiled thrift API list is parsed via thriftApis.split(",") but entries are not trimmed and empty strings are not filtered. A config like "get_table_req, get_database_req" will include a leading space and never match method names, silently disabling profiling for that API. Consider trimming each entry and skipping blanks when populating PROFILED_APIS. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.google.common.annotations.VisibleForTesting; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo; +import org.slf4j.LoggerFactory; + +import static java.sql.DriverManager.registerDriver; + +public class MetastoreDriver implements Driver { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MetastoreDriver.class); + private static final String URL_PREFIX = "jdbc:metastore://"; + private static int majorVersion = -1; + private static int minorVersion = -1; + private static final Configuration configuration; + private static final Driver delegateDriver; + private static final String jdbcUrl; + static { + try { + registerDriver(new MetastoreDriver()); + String versionString = MetastoreVersionInfo.getVersion(); + String[] versionNums = versionString.split("\\."); + if (NumberUtils.isNumber(versionNums[0])) { + majorVersion = Integer.parseInt(versionNums[0]); + } + if (versionNums.length >1 && NumberUtils.isNumber(versionNums[1])) { + minorVersion = Integer.parseInt(versionNums[1]); + } + configuration = MetastoreConf.newMetastoreConf(); + jdbcUrl = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY); + delegateDriver = findRegisteredDriver(configuration); + } catch (Exception e) { Review Comment: The driver caches Configuration and jdbcUrl in static finals during class initialization (configuration = newMetastoreConf(); jdbcUrl = CONNECT_URL_KEY). This prevents the wrapper driver from honoring per-instance/per-test configurations or runtime connection URL updates (e.g., MetaStoreInit.updateConnectionURL) because connect() will always use the initially captured jdbcUrl. Consider passing the real delegate JDBC URL via the wrapper URL or via Properties, or otherwise avoid binding to a single static jdbcUrl at class-load time. ########## standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestMetastoreConnection.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.zaxxer.hikari.HikariDataSource; + +import javax.sql.DataSource; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.derby.impl.jdbc.EmbedConnection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HMSHandlerContext; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MetastoreUnitTest.class) +public class TestMetastoreConnection { + private Configuration conf; + private Counter slowQuery; + + @Before + public void init() { + conf = MetastoreDriver.getConfiguration(); + conf.set(MetastoreStatement.EXEC_HOOK, MetastoreStatementTestHook.class.getName()); + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_JDBC_SLOW_QUERIES, 1000); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_EXECUTION, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_THRIFT_APIS, "test_metastore_statement"); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_USER_NAME, "dummyUser"); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.PWD, "dummyPass"); + conf.unset(MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE.getVarname()); + + Metrics.initialize(conf); + slowQuery = Metrics.getOrCreateCounter(MetricsConstants.JDBC_SLOW_QUERIES); + } + + @Test + public void testDefaultHikariCp() throws Exception { + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + Assert.assertNotNull(dsp); + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + verify(ds.getConnection()); + } + + @Test + public void testDbCpDataSource() throws Exception { + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE, DbCPDataSourceProvider.DBCP); + + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + Assert.assertNotNull(dsp); + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof PoolingDataSource); + verify(ds.getConnection()); + } + + private void verify(Connection connection) throws Exception { + Assert.assertTrue(connection.unwrap(MetastoreConnection.class).delegate() instanceof EmbedConnection); + long slowNum = slowQuery.getCount(); + Timer timer = Metrics.getOrCreateTimer(MetastoreStatementTestHook.TEST_METRIC_NAME); + Assert.assertNotNull(timer); + long timeCount = timer.getCount(); + try (AutoCloseable sleep = MetastoreStatementTestHook.testConnection("test_metastore_statement", 1500)) { + try (Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("VALUES 1")) { + Assert.assertTrue(rs.next()); + } + } + Assert.assertEquals(slowNum + 1, slowQuery.getCount()); + Assert.assertEquals(timeCount + 1, timer.getCount()); + Assert.assertTrue(timer.getSnapshot().getMean() > 1000); + Review Comment: Timer snapshot values are reported in nanoseconds in Dropwizard Metrics Timer. Asserting getMean() > 1000 is effectively always true for any non-trivial timing and doesn’t validate the intended ~1500ms delay. Consider asserting against TimeUnit.MILLISECONDS.toNanos(1000) (or converting the mean to milliseconds) so the test actually verifies the profiling duration. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java: ########## @@ -87,9 +91,21 @@ public Result invokeInternal(final Object proxy, final Method method, final Obje } Object object = null; boolean isStarted = Deadline.startTimer(method.getName()); + boolean clearLocal = false; try { + if (!local) { + Optional<Pair<String, Long>> previousCall = HMSHandlerContext.getCallId(); + if (previousCall.isEmpty()) { + Pair<String, Long> currentCall = Pair.of(method.getName(), System.currentTimeMillis()); + HMSHandlerContext.setCallId(currentCall); + clearLocal = true; + } + } object = method.invoke(baseHandler, args); } finally { + if (clearLocal) { + HMSHandlerContext.setCallId(null); + } Review Comment: callId is cleared in the finally block, but MetastoreStatement’s per-call aggregation (CURRENT_CALL ThreadLocal) is never flushed/cleared when the thrift call ends. As written, the “total JDBC time per thrift call” log only happens when a later JDBC call observes a different callId; the last call on a thread (or any call followed by a thrift call that executes no JDBC) may never be logged. Consider adding an explicit flush/clear when clearing callId (e.g., via a public helper on MetastoreStatement, or by moving the accumulator into HMSHandlerContext so it’s cleared with the context). ########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java: ########## @@ -648,6 +648,13 @@ public enum ConfVars { "hive.txn.acid.metrics.delta.pct.threshold", 0.01f, "Percentage (fractional) size of the delta files relative to the base directory. Deltas smaller than this threshold " + "count as small deltas. Default 0.01 = 1%.)"), + METASTORE_JDBC_SLOW_QUERIES("metastore.jdbc.execution.logSlowQueriesThreshold", "metastore.jdbc.execution.logSlowQueriesThreshold", + 3000, "Log the slow jdbc query that Metastore has been waiting for the result beyond the threshold(ms), " + + "should enable the metastore.profile.jdbc.execution first"), + METASTORE_PROFILE_JDBC_EXECUTION("metastore.profile.jdbc.execution", "metastore.profile.jdbc.execution", true, Review Comment: METASTORE_PROFILE_JDBC_EXECUTION defaults to true, which will change the default behavior for all metastore deployments by routing JDBC URLs through the wrapper driver and enabling the new slow-execution logging/metrics path. If this feature is intended to be opt-in profiling, consider defaulting this to false to avoid unexpected overhead/log volume and potential compatibility issues for existing deployments. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreStatement.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.sql.Statement; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HMSHandlerContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.logSlowExecution; +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.isSlowExecution; + +@SuppressWarnings("unchecked") +public final class MetastoreStatement implements InvocationHandler { + private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatement.class); + static final String EXEC_HOOK = "metastore.jdbc.execution.hook"; + static final ThreadLocal<Pair<Pair<String, Long>, LongAdder>> CURRENT_CALL = new ThreadLocal<>(); + + private final String rawSql; + private final Statement delegate; + private final Configuration configuration; + private final MetastoreStatementHook hook; + + private MetastoreStatement(Configuration conf, Statement statement, String rawSql) { + this.configuration = Objects.requireNonNull(conf); + this.rawSql = rawSql; + this.delegate = Objects.requireNonNull(statement); + String className = conf.get(EXEC_HOOK, ""); + if (StringUtils.isEmpty(className)) { + hook = new JdbcProfilerUtils(conf); + } else { + try { + hook = JavaUtils.newInstance(JavaUtils.getClass(className.trim(), MetastoreStatementHook.class), + new Class[] { Configuration.class}, new Object[] {conf}); + } catch (MetaException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + public static <T extends Statement> T getProxyStatement(Configuration configuration, T delegate, String rawSql) { + MetastoreStatement handler = new MetastoreStatement(configuration, delegate, rawSql); + return (T) Proxy.newProxyInstance(JavaUtils.getClassLoader(), + ClassUtils.getAllInterfaces(delegate.getClass()).toArray(new Class[0]), handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Timer.Context ctx = null; + try { + LongAdder adder = null; + boolean shouldMonitor = hook.profile(rawSql, method, args); + if (Metrics.getRegistry() != null && shouldMonitor) { + Optional<Pair<String, Long>> ctxCall = HMSHandlerContext.getCallId(); + Pair<Pair<String, Long>, LongAdder> previousCall = CURRENT_CALL.get(); + if (ctxCall.isPresent()) { + if ((previousCall == null || !ctxCall.get().equals(previousCall.getLeft()))) { + // we approach the end of previous thrift call + if (previousCall != null) { + Pair<String, Long> call = previousCall.getLeft(); + long totalSpent = previousCall.getRight().longValue(); + LOG.debug("{} took {} ms to complete all jdbc queries", call.getLeft(), totalSpent); + if (isSlowExecution(configuration, totalSpent)) { + LOG.info("{} took {} ms to complete all jdbc queries", call.getLeft(), totalSpent); + } + } + adder = new LongAdder(); + CURRENT_CALL.set(Pair.of(ctxCall.get(), adder)); + } else { + adder = previousCall.getRight(); + } + } + String metricName = hook.getMetricName(method, args); + Timer timer = Metrics.getOrCreateTimer(metricName); + if (timer != null) { + ctx = timer.time(); + } + } + long start = System.currentTimeMillis(); + hook.preRun(method, args); + Object result = method.invoke(delegate, args); + hook.postRun(method, args, result); + long timeSpent = System.currentTimeMillis() - start; + if (shouldMonitor) { + String statement = rawSql != null ? rawSql : (args != null && args.length > 0 ? (String) args[0] : "no sql found"); + LOG.debug("SQL query: {} completed in {} ms", statement, timeSpent); + } + logSlowExecution(timeSpent, configuration, rawSql, method, args); + if (adder != null) { + adder.add(timeSpent); + } Review Comment: logSlowExecution(timeSpent, ...) is invoked for every Statement method call, not just query execution methods. This can produce “slow query” warnings for unrelated operations (e.g., close/getWarnings) and doesn’t match the ConfVar description (“slow jdbc query”). Consider gating slow-execution logging/counter updates on QUERY_EXECUTION.contains(method.getName()) (or renaming the config/message if the intent is broader than query execution). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
