deniskuzZ commented on code in PR #4384: URL: https://github.com/apache/hive/pull/4384#discussion_r1309787885
########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retryhandling/DataSourceWrapper.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.retryhandling; + +import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ResultSetExtractor; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.DataSourceUtils; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +/** + * Wraps multiple {@link DataSource}s into a single object and offers transaction management functionality. + * Allows access of the {@link NamedParameterJdbcTemplate}, {@link Connection} objects associated with the wrapped datasources. + */ +public class DataSourceWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(DataSourceWrapper.class); + private static final ThreadLocal<RetryContext> threadLocal = new ThreadLocal<>(); + + private final Map<String, DataSource> dataSources; + private final Map<String, PlatformTransactionManager> transactionManagers = new HashMap<>(); + private final Map<String, NamedParameterJdbcTemplate> jdbcTemplates = new HashMap<>(); + private final DatabaseProduct databaseProduct; + + /** + * Creates a new isntance of the {@link DataSourceWrapper} class + * @param dataSources A {@link Map} of the datasource names and the corresponding datasources which needs to be wrapped + * by this instance + * @param databaseProduct A {@link DatabaseProduct} instance representing the type of the underlying HMS dabatabe. + */ + public DataSourceWrapper(Map<String, DataSource> dataSources, DatabaseProduct databaseProduct) { + this.dataSources = dataSources; + for(String dataSource : dataSources.keySet()) { + NamedParameterJdbcTemplate jdbcTemplate = new NamedParameterJdbcTemplate(dataSources.get(dataSource)); + jdbcTemplates.put(dataSource, jdbcTemplate); + transactionManagers.put(dataSource, new DataSourceTransactionManager(Objects.requireNonNull(jdbcTemplate.getJdbcTemplate().getDataSource()))); + } + this.databaseProduct = databaseProduct; + } + + + /** + * Provides direct access to the {@link TransactionStatus} to allow manual savepoint creation, rollback, etc. + * Can be called only within an established {@link RetryContext}. + * @return Returns the {@link TransactionStatus} instance stored in the {@link RetryContext}. + * @throws IllegalStateException Thrown when called outside a {@link RetryContext}. + */ + public TransactionStatus getTransactionStatus() { + return RetryContext.getRetryContext().transactionStatus; + } + + /** + * @return Returns true if the current {@link Thread} has an associated {@link RetryContext}, false otherwise. + */ + boolean hasRetryContext() { + return threadLocal.get() != null; + } + + /** + * @return Returns the {@link NamedParameterJdbcTemplate} associated with the current {@link RetryContext}. + * Can be called only within an established {@link RetryContext}. + * @throws IllegalStateException Thrown when called outside a {@link RetryContext}. + */ + public NamedParameterJdbcTemplate getJdbcTemplate() { + return jdbcTemplates.get(RetryContext.getRetryContext().datasource); + } + + /** + * Gets a connection to the {@link DataSource} associated with the current {@link RetryContext}. Ensures that the same + * instance is returned all the time within a particular transaction. Can be called only within an established + * {@link RetryContext}. + * @return Returns a connection to the datasource. + * @throws IllegalArgumentException Thrown when called outside a {@link RetryContext} + */ + public Connection getConnection() { + return DataSourceUtils.getConnection(dataSources.get(RetryContext.getRetryContext().datasource)); + } + + /** + * @return Returns a {@link DatabaseProduct} instance representing the type of the underlying HMS dabatabe. + */ + public DatabaseProduct getDatabaseProduct() { + return databaseProduct; + } + + /** + * <p>Establishes a {@link RetryContext} on the current {@link Thread}: Creates a new transaction with the given + * {@link TransactionDefinition} using an internal {@link PlatformTransactionManager} associated with the reffered + * {@link DataSource}. Returns a {@link TransactionStatus} representing the created transaction. The returned + * {@link TransactionStatus} instance can be used for manual transaction handling.</p> + * <br> + * <p>!!!! Please note that if there already is a {@link RetryContext} established, this call will create a nested</p> + * {@link RetryContext} (and transaction).</b> + * @param definition The {@link TransactionDefinition} to use for creating a new transaction. + * @param dataSource The identifier of the {@link DataSource} to use + * @return Returns a {@link TransactionStatus} object representing the created transaction. + * @throws TransactionException Forwarded from {@link PlatformTransactionManager#getTransaction(TransactionDefinition)} + */ + public TransactionStatus getTransactionWithinRetryContext(TransactionDefinition definition, String dataSource) throws TransactionException { + TransactionStatus status = transactionManagers.get(dataSource).getTransaction(definition); + RetryContext.setRetryContext(status, dataSource); + return status; + } + + /** + * Commits the transaction associated with the current {@link Thread} and clears the {@link RetryContext}. + * Can be called only within an established {@link RetryContext}. + * @throws TransactionException Forwarded from {@link PlatformTransactionManager#commit(TransactionStatus)} + * @throws IllegalArgumentException Thrown when called outside a {@link RetryContext} + */ + public void commit() throws TransactionException { + try { + RetryContext context = RetryContext.getRetryContext(); + transactionManagers.get(context.datasource).commit(context.transactionStatus); + } finally { + RetryContext.clearRetryContext(); + } + } + + /** + * Rollbacks the transaction associated with the current {@link Thread} and clears the {@link RetryContext}. + * Can be called only within an established {@link RetryContext}. + * @throws TransactionException Forwarded from {@link PlatformTransactionManager#rollback(TransactionStatus)} + * @throws IllegalArgumentException Thrown when called outside a {@link RetryContext} + */ + public void rollback() throws TransactionException { + try { + RetryContext context = RetryContext.getRetryContext(); + transactionManagers.get(context.datasource).rollback(context.transactionStatus); + } finally { + RetryContext.clearRetryContext(); + } + } + + /** + * Executes a {@link NamedParameterJdbcTemplate#update(String, org.springframework.jdbc.core.namedparam.SqlParameterSource)} + * calls using the query string and parameters obtained from {@link ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)} and + * {@link ParameterizedCommand#getQueryParameters()} methods. Validates the resulted number of affected rows using the + * {@link ParameterizedCommand#resultPolicy()} function. + * @param command The {@link ParameterizedCommand} to execute. + * @return Returns the number of affected rows. + * @throws MetaException Forwarded from {@link ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)} or + * thrown if the update count was rejected by the {@link ParameterizedCommand#resultPolicy()} method + */ + public Integer execute(ParameterizedCommand command) throws MetaException { + return execute(command.getParameterizedQueryString(getDatabaseProduct()), + command.getQueryParameters(), command.resultPolicy()); + } + + /** + * Executes a {@link NamedParameterJdbcTemplate#update(String, org.springframework.jdbc.core.namedparam.SqlParameterSource)} + * calls using the query string and {@link SqlParameterSource}. Validates the resulted number of affected rows using the + * resultpolicy function. + * @param query Parameterized query string. + * @param params Qyery parameters + * @param resultPolicy Result policy to use, or null, if no result policy. + * @return Returns the number of affected rows. + * @throws MetaException Forwarded from {@link ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)} or + * thrown if the update count was rejected by the {@link ParameterizedCommand#resultPolicy()} method + */ + public Integer execute(String query, SqlParameterSource params, + Function<Integer, Boolean> resultPolicy) throws MetaException { + LOG.debug("Going to execute command <{}>", query); + int count = getJdbcTemplate().update(query, params); + if (resultPolicy != null && !resultPolicy.apply(count)) { + LOG.error("The update count was " + count + " which is not the expected. Rolling back."); + throw new MetaException("The update count was " + count + " which is not the expected. Rolling back."); + } + LOG.debug("Command <{}> updated {} records.", query, count); + return count; + } + + /** + * Executes a {@link NamedParameterJdbcTemplate#query(String, SqlParameterSource, ResultSetExtractor)} call using the query + * string and parameters obtained from {@link QueryHandler#getParameterizedQueryString(DatabaseProduct)} and + * {@link QueryHandler#getQueryParameters()} methods. Processes the result using the {@link QueryHandler#extractData(ResultSet)} + * method ({@link QueryHandler} extends the {@link ResultSetExtractor} interface). + * @param queryHandler The {@link QueryHandler} instance containing the query, {@link SqlParameterSource}, and {@link ResultSetExtractor}. + * @return Returns with the object(s) constructed from the result of the executed query. + * @throws MetaException Forwarded from {@link ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)}. + */ + public <Result> Result execute(QueryHandler<Result> queryHandler) throws MetaException { + String queryStr = queryHandler.getParameterizedQueryString(getDatabaseProduct()); + LOG.debug("Going to execute query <{}>", queryStr); + SqlParameterSource params = queryHandler.getQueryParameters(); + if (params != null) { + return getJdbcTemplate().query(queryStr, params, queryHandler); + } else { + return getJdbcTemplate().query(queryStr, queryHandler); + } + } + + /** + * Represents a transaciton associated with a {@link DataSource} and a {@link Thread}. Holds the {@link TransactionStatus} + * representing the transaction, and the name of the {@link DataSource} used to create the transaction. Used for two things: + * <ul> + * <li> + * Its presence tells to {@link RetryHandler} that there is no need for new (nested) context, it can use the + * existing one + * </li> + * <li> + * The {@link DataSourceWrapper#getTransactionStatus()}, {@link DataSourceWrapper#getConnection()}, + * {@link DataSourceWrapper#getJdbcTemplate()}, {@link DataSourceWrapper#commit()}, {@link DataSourceWrapper#rollback()} + * methods are using it to identify the correct datasource which is associated with the current + * {@link RetryContext} (and {@link Thread}). + * </li> + * </ul> + */ + static class RetryContext { + + /** + * Retrurns the previously established {@link RetryContext}. Can be called only within an established {@link RetryContext}. + * @return Retruns the previously established {@link RetryContext}. + * @throws IllegalStateException Thrown when called outside a {@link RetryContext}. + */ + private static RetryContext getRetryContext() { + RetryContext context = threadLocal.get(); + if (context == null) { + throw new IllegalStateException("Trying to access a transactional resource without retry context!"); + } + return context; + } + + /** + * Establishes a {@link RetryContext} on the current thread using the given {@link TransactionStatus}. + * @param status The {@link TransactionStatus} to bind to the current {@link Thread}. + * @param datasource The name of the {@link DataSource} used to create the {@link TransactionStatus}. + */ + private static void setRetryContext(TransactionStatus status, String datasource) { + RetryContext old = threadLocal.get(); + threadLocal.set(new RetryContext(status, datasource, old)); + } + + /** + * Clears a previously established {@link RetryContext} on the current {@link Thread}. + */ + private static void clearRetryContext() { + RetryContext top = threadLocal.get(); + if(top != null && top.parent != null) { Review Comment: nit: space -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
