veghlaci05 commented on code in PR #4566: URL: https://github.com/apache/hive/pull/4566#discussion_r1418915844
########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/StackThreadLocal.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import java.util.Stack; + +public class StackThreadLocal<T> { + + private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<>(); + + public void set(T value) { + Stack<T> stack = threadLocal.get(); + if (stack == null) { + stack = new Stack<>(); Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnLockHandler.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.common.classification.RetrySemantics; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.txn.retryhandling.SqlRetry; +import org.springframework.transaction.annotation.Transactional; + +import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_TX; + +public interface TxnLockHandler { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java: ########## @@ -149,15 +164,23 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl LOG.debug("Successfull method invocation within retry context: {}", callerId); return result; } catch (IllegalAccessException | InvocationTargetException | UndeclaredThrowableException e) { - if (e.getCause() instanceof MetaException) { - throw (MetaException) e.getCause(); + if (e.getCause() instanceof TException) { + throw (TException) e.getCause(); } else if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); } else { throw new RuntimeException(e); } - } catch (Throwable e) { - throw new RuntimeException(e); + } catch (TException | DataAccessException e) { Review Comment: I think this block is specific to this class. Can't recall similar blocks elsewhere. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java: ########## @@ -712,6 +782,7 @@ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedT * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the * transaction metadata once it becomes unnecessary. */ + @Transactional(POOL_TX) Review Comment: Doublechecked, and the txn pool was used for this. However I can change it. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java: ########## @@ -666,4 +627,60 @@ public static Character thriftCompactionType2DbType(CompactionType ct) throws Me public static String nvl(String input) { return input != null ? " = ? " : " IS NULL "; } + + public static String normalizePartitionCase(String s) { Review Comment: `PartitionUtils` is in hive-exec which is not a dependency of metastore-server. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/HiveMutex.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.impl; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; +import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext; +import org.apache.hadoop.hive.metastore.txn.retryhandling.SqlRetryHandler; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; + +import java.sql.SQLException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX; +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; + +public class HiveMutex implements TxnStore.MutexAPI { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/LockInfo.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.entities; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.LockTypeUtil; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class LockInfo { Review Comment: Yeah good idea, will move them ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/commands/AddWriteIdsToMinHistoryCommand.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.impl.commands; Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/LockHandleImpl.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.impl; + +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; +import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; + +import java.util.concurrent.Semaphore; + +import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX; + +public final class LockHandleImpl implements TxnStore.MutexAPI.LockHandle { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/ConditionalCommand.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.jdbc; + +import org.apache.hadoop.hive.metastore.DatabaseProduct; + +/** + * {@link ParameterizedCommand}. {@link ParameterizedBatchCommand}, and {@link InClauseBatchCommand} implementations can also + * implement this interface, marking that the execution is conditial, and the command wants to get notified about + * execution errors. Can be used to implement commands depending on some feature flag(s). + */ +public interface ConditionalCommand { + + /** + * Indicates if the command should be executed or not + * @param databaseProduct + * @return Returns true if the command can be executed, false otherwise. + */ + boolean shouldBeUsed(DatabaseProduct databaseProduct); Review Comment: https://github.com/apache/hive/pull/4566/files#diff-99f1624b0fe7acb8f8919dd47ae38747c1abdf98590fe96962cc1bd1bc3fe627R341 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/ProgrammaticRollbackException.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.jdbc; + +import java.lang.reflect.Method; + +/** + * This exception can be used to trigger rollback in + * {@link org.apache.hadoop.hive.metastore.txn.TransactionalRetryProxy#invoke(Object, Method, Object[])} + * for the current transaction, without propagating the exception to the caller. The proxy will catch this exception, + * rollback the transaction (if not yet completed already) and return the value supplied in the constructor to te caller. + */ +public class ProgrammaticRollbackException extends RuntimeException { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/functions/HeartBeatLockFunction.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.impl.functions; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; +import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; + +import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; + +/** + * Heartbeats on the lock table. This commits, so do not enter it with any state. + * Should not be called on a lock that belongs to transaction. + */ +public class HeartBeatLockFunction implements TransactionalFunction<Void> { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java: ########## @@ -546,41 +548,42 @@ public boolean supportsGetGeneratedKeys() throws MetaException { } } - public boolean isDuplicateKeyError(SQLException ex) { + public boolean isDuplicateKeyError(Throwable ex) { Review Comment: done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -648,260 +406,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { throw new MetaException("Invalid input for number of txns: " + numTxns); } - try { - Connection dbConn = null; - Statement stmt = null; - try { - /* - * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. - * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause - * performance degradation with high transaction load. - * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every - * transaction missing from the TXNS table in that period open, and prevent opening transaction outside - * the period. - * Example: At t[0] there is one open transaction in the TXNS table, T[1]. - * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. - * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. - * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s - * open transaction list. T[1] because it is presented as open in TXNS, - * T[2] because it is a missing sequence. - * - * In the current design, there can be several metastore instances running in a given Warehouse. - * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, - * a client may go to MS1 and start a transaction with ID 500 to update a particular row. - * Now the same client will start another transaction, except it ends up on MS2 and may get - * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot - * on read will thing the version of the row from transaction ID 500 is the latest one. - * - * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This - * set could support a write-through cache for added performance. - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - /* - * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. - * This is achieved by requesting a shared table lock here, and an exclusive one at commit. - * Since table locks are working in Derby, we don't need the lockInternal call here. - * Example: Suppose we have two transactions with update like x = x+1. - * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] - * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], - * and we will have a lost update problem - */ - acquireTxnLock(stmt, true); - // Measure the time from acquiring the sequence value, till committing in the TXNS table - StopWatch generateTransactionWatch = new StopWatch(); - generateTransactionWatch.start(); - - List<Long> txnIds = openTxns(dbConn, rqst); + /* + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. + * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause + * performance degradation with high transaction load. + * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every + * transaction missing from the TXNS table in that period open, and prevent opening transaction outside + * the period. + * Example: At t[0] there is one open transaction in the TXNS table, T[1]. + * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. + * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. + * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s + * open transaction list. T[1] because it is presented as open in TXNS, + * T[2] because it is a missing sequence. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ + /* + * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. + * This is achieved by requesting a shared table lock here, and an exclusive one at commit. + * Since table locks are working in Derby, we don't need the lockInternal call here. + * Example: Suppose we have two transactions with update like x = x+1. + * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] + * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], + * and we will have a lost update problem + */ + acquireTxnLock(true); + // Measure the time from acquiring the sequence value, till committing in the TXNS table Review Comment: No, this already existed ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -514,112 +365,19 @@ public Configuration getConf() { @Override @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { - return getOpenTxnsList(true).toOpenTxnsInfoResponse(); + return jdbcResource.execute(new GetOpenTxnsListHandler(true, openTxnTimeOutMillis)).toOpenTxnsInfoResponse(); } @Override @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { - return getOpenTxnsList(false).toOpenTxnsResponse(Arrays.asList(TxnType.READ_ONLY)); - } - - private GetOpenTxnsResponse getOpenTxns(Connection dbConn) throws MetaException { - return getOpenTxnsList(false, dbConn).toOpenTxnsResponse(Arrays.asList(TxnType.READ_ONLY)); + return jdbcResource.execute(new GetOpenTxnsListHandler(false, openTxnTimeOutMillis)).toOpenTxnsResponse(Collections.singletonList(TxnType.READ_ONLY)); Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -470,28 +320,29 @@ private boolean checkIfTableIsUsable(String tableName, boolean configValue) thro // don't check it if disabled return false; } - Connection dbConn = null; - boolean tableExists = true; + jdbcResource.bindDataSource(POOL_TX); try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - try (Statement stmt = dbConn.createStatement()) { - // Dummy query to see if table exists - try (ResultSet rs = stmt.executeQuery("SELECT 1 FROM \"" + tableName + "\"")) { - rs.next(); - } - } - } catch (SQLException e) { + jdbcResource.getJdbcTemplate().query("SELECT 1 FROM \"" + tableName + "\"", new MapSqlParameterSource(), + rs -> { + if (rs.next()) { Review Comment: Yeah, I just modified it to use `JdbcTemplate` without analyzing it deeper. The result doesn't matter so I replaced the lambda with `ResultSet::next` ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnLockHandlerImpl.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.txn.entities.LockInfo; +import org.apache.hadoop.hive.metastore.txn.impl.functions.CheckLockFunction; +import org.apache.hadoop.hive.metastore.txn.impl.functions.EnqueueLockFunction; +import org.apache.hadoop.hive.metastore.txn.impl.queries.GetLocksByLockId; +import org.apache.hadoop.hive.metastore.txn.impl.queries.ShowLocksHandler; +import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; +import org.apache.hadoop.hive.metastore.txn.jdbc.ProgrammaticRollbackException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; + +import java.sql.Types; +import java.util.List; + +import static org.apache.hadoop.hive.metastore.txn.entities.LockInfo.LOCK_WAITING; + +public class TxnLockHandlerImpl implements TxnLockHandler { Review Comment: Done ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -248,199 +181,115 @@ * the metstore call stack should have logic not to retry. There are {@link RetrySemantics} * annotations to document the behavior. */ +@SuppressWarnings("SqlSourceToSinkFlow") @InterfaceAudience.Private @InterfaceStability.Evolving abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { - private static final String TXN_TMP_STATE = "_"; - private static final String DEFAULT_POOL_NAME = "default"; - - // Lock states - static final protected char LOCK_ACQUIRED = 'a'; - static final protected char LOCK_WAITING = 'w'; - - private static final int ALLOWED_REPEATED_DEADLOCKS = 10; private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); - private static DataSource connPool; - private static DataSource connPoolMutex; - - private static final String MANUAL_RETRY = "ManualRetry"; - - // Query definitions - private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + - "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + - "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; - private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" + - "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" + - " VALUES (?, ?, ?, ?, ?, ?)"; - private static final String TXN_COMPONENTS_DP_DELETE_QUERY = "DELETE FROM \"TXN_COMPONENTS\" " + - "WHERE \"TC_TXNID\" = ? AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? AND \"TC_PARTITION\" IS NULL"; - private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %s"; - private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s " + - "WHERE \"HL_LOCK_EXT_ID\" = %s"; - private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + - " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?"; - private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + - "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + - " VALUES (%s, ?, ?, ?, ?, %s)"; - private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " + - "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + - "VALUES(?,%s,%s,?,?,?)"; - private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " + - "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " + - "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; - private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + - "WHERE \"HL_LAST_HEARTBEAT\" < %s - :timeout AND \"HL_TXNID\" = 0"; - private static final String TXN_TO_WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " + - "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)"; - private static final String MIN_HISTORY_WRITE_ID_INSERT_QUERY = "INSERT INTO \"MIN_HISTORY_WRITE_ID\" (\"MH_TXNID\", " + - "\"MH_DATABASE\", \"MH_TABLE\", \"MH_WRITEID\") VALUES (?, ?, ?, ?)"; - private static final String SELECT_NWI_NEXT_FROM_NEXT_WRITE_ID = - "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; - private static final String SELECT_METRICS_INFO_QUERY = - "SELECT * FROM (SELECT COUNT(*) FROM \"TXN_TO_WRITE_ID\") \"TTWID\" CROSS JOIN (" + - "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\") \"CTC\" CROSS JOIN (" + - "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 FROM \"TXNS\" WHERE \"TXN_STATE\"='" + - TxnStatus.OPEN + "' AND \"TXN_TYPE\" = "+ TxnType.REPL_CREATED.getValue() +") \"TR\" CROSS JOIN (" + - "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 FROM \"TXNS\" WHERE \"TXN_STATE\"='" + - TxnStatus.OPEN + "' AND \"TXN_TYPE\" != "+ TxnType.REPL_CREATED.getValue() +") \"T\" CROSS JOIN (" + - "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 FROM \"TXNS\" WHERE \"TXN_STATE\"='" + - TxnStatus.ABORTED + "') \"A\" CROSS JOIN (" + - "SELECT COUNT(*), ({0} - MIN(\"HL_ACQUIRED_AT\"))/1000 FROM \"HIVE_LOCKS\") \"HL\" CROSS JOIN (" + - "SELECT ({0} - MIN(\"CQ_COMMIT_TIME\"))/1000 from \"COMPACTION_QUEUE\" WHERE " + - "\"CQ_STATE\"=''" + READY_FOR_CLEANING + "'') OLDEST_CLEAN"; - private static final String SELECT_TABLES_WITH_X_ABORTED_TXNS = - "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXN_COMPONENTS\" " + - "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + - " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING COUNT(\"TXN_ID\") > ?"; - - private static final String EXCL_CTAS_ERR_MSG = - "Failed to initiate a concurrent CTAS operation with the same table name, lockInfo : %s"; - private static final String ZERO_WAIT_READ_ERR_MSG = "Unable to acquire read lock due to an existing exclusive lock {%s}"; - - - protected List<TransactionalMetaStoreEventListener> transactionalListeners; - // Maximum number of open transactions that's allowed private static volatile int maxOpenTxns = 0; // Whether number of open transactions reaches the threshold private static volatile boolean tooManyOpenTxns = false; + // Current number of open txns + private static AtomicInteger numOpenTxns; + + private static volatile boolean initialized = false; + private static DataSource connPool; + private static DataSource connPoolMutex; + protected static DataSource connPoolCompactor; + + protected static DatabaseProduct dbProduct; + protected static SQLGenerator sqlGenerator; + protected static long openTxnTimeOutMillis; /** * Number of consecutive deadlocks we have seen */ - private int deadlockCnt; - private long deadlockRetryInterval; protected Configuration conf; - protected static DatabaseProduct dbProduct; - protected static SQLGenerator sqlGenerator; - private static long openTxnTimeOutMillis; + protected List<TransactionalMetaStoreEventListener> transactionalListeners; // (End user) Transaction timeout, in milliseconds. private long timeout; private long replicationTxnTimeout; - private int maxBatchSize; - private String identifierQuoteString; // quotes to use for quoting tables, where necessary - private long retryInterval; - private int retryLimit; - private int retryNum; - // Current number of open txns - private AtomicInteger numOpenTxns; - // Whether to use min_history_level table or not. - // At startup we read it from the config, but set it to false if min_history_level does nto exists. - static boolean useMinHistoryLevel; - static boolean useMinHistoryWriteId; - - private static SqlRetryHandler sqlRetryHandler; - protected static MultiDataSourceJdbcResource jdbcResource; + private MutexAPI mutexAPI; + private TxnLockHandler txnLockHandler; + private SqlRetryHandler sqlRetryHandler; + protected MultiDataSourceJdbcResource jdbcResource; - /** - * Derby specific concurrency control - */ - private static final ReentrantLock derbyLock = new ReentrantLock(true); - /** - * must be static since even in UT there may be > 1 instance of TxnHandler - * (e.g. via Compactor services) - */ - private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>(); private static final String hostname = JavaUtils.hostname(); - // Private methods should never catch SQLException and then throw MetaException. The public - // methods depend on SQLException coming back so they can detect and handle deadlocks. Private - // methods should only throw MetaException when they explicitly know there's a logic error and - // they want to throw past the public methods. - // - // All public methods that write to the database have to check for deadlocks when a SQLException - // comes back and handle it if they see one. This has to be done with the connection pooling - // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, - // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. - public TxnHandler() { } /** * This is logically part of c'tor and must be called prior to any other method. * Not physically part of c'tor due to use of reflection */ - public void setConf(Configuration conf){ + public void setConf(Configuration conf) { this.conf = conf; - int maxPoolSize = MetastoreConf.getIntVar(conf, ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); - synchronized (TxnHandler.class) { - try (DataSourceProvider.DataSourceNameConfigurator configurator = - new DataSourceProvider.DataSourceNameConfigurator(conf, POOL_TX)) { - if (connPool == null) { - connPool = setupJdbcConnectionPool(conf, maxPoolSize); - } - if (connPoolMutex == null) { - configurator.resetName(POOL_MUTEX); - connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize); - } - } - if (dbProduct == null) { - try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED)) { - determineDatabaseProduct(dbConn); - } catch (SQLException e) { - LOG.error("Unable to determine database product", e); - throw new RuntimeException(e); + if (!initialized) { + synchronized (TxnHandler.class) { + if (!initialized) { + try (DataSourceProvider.DataSourceNameConfigurator configurator = + new DataSourceProvider.DataSourceNameConfigurator(conf, POOL_TX)) { + int maxPoolSize = MetastoreConf.getIntVar(conf, ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); + if (connPool == null) { + connPool = setupJdbcConnectionPool(conf, maxPoolSize); + } + if (connPoolMutex == null) { + configurator.resetName(POOL_MUTEX); + connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize); + } + if (connPoolCompactor == null) { + configurator.resetName(POOL_COMPACTOR); + connPoolCompactor = setupJdbcConnectionPool(conf, + MetastoreConf.getIntVar(conf, ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS)); + } + } + if (dbProduct == null) { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPool)) { + determineDatabaseProduct(dbConn); + } catch (SQLException e) { + LOG.error("Unable to determine database product", e); + throw new RuntimeException(e); + } + } + if (sqlGenerator == null) { + sqlGenerator = new SQLGenerator(dbProduct, conf); + } + initialized = true; } } + } - if (sqlGenerator == null) { - sqlGenerator = new SQLGenerator(dbProduct, conf); - } - - if (jdbcResource == null) { - jdbcResource = new MultiDataSourceJdbcResource(dbProduct); - jdbcResource.registerDataSource(POOL_TX, connPool); - jdbcResource.registerDataSource(POOL_MUTEX, connPoolMutex); - } + if (jdbcResource == null) { + jdbcResource = new MultiDataSourceJdbcResource(dbProduct, conf, sqlGenerator); + jdbcResource.registerDataSource(POOL_TX, connPool); + jdbcResource.registerDataSource(POOL_MUTEX, connPoolMutex); + jdbcResource.registerDataSource(POOL_COMPACTOR, connPoolCompactor); } + + mutexAPI = new HiveMutex(sqlGenerator, jdbcResource); numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS); timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); replicationTxnTimeout = MetastoreConf.getTimeVar(conf, ConfVars.REPL_TXN_TIMEOUT, TimeUnit.MILLISECONDS); - retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, - TimeUnit.MILLISECONDS); - retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); - deadlockRetryInterval = retryInterval / 10; maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); - maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); - openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); try { - useMinHistoryWriteId = checkIfTableIsUsable("MIN_HISTORY_WRITE_ID", - MetastoreConf.getBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID)); - - // override the config if table does not exists anymore + TxnHandlingFeatures.setUseMinHistoryWriteId(checkIfTableIsUsable("MIN_HISTORY_WRITE_ID", + MetastoreConf.getBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID)) Review Comment: Did the formatting. Moving it would require to pass `jdbcResource` as well, I think it is easier to set up the fields as a part of TxnHandler init. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retryhandling/SqlRetryHandler.java: ########## @@ -123,32 +124,29 @@ public <Result> Result executeWithRetry(SqlRetryCallProperties properties, SqlRe if (properties.isLockInternally()) { lockInternal(); } - threadLocal.set(new ContextNode<>(threadLocal.get(), new Object())); + threadLocal.set(new Object()); Review Comment: In `SqlRetryHandler` the context doesn't hold any information. It's just a marker if we already have an established context or not. The reason it's not a boolean flag that theoretically there can be any level of nesting which we need to keep track of. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -457,7 +306,8 @@ public void setConf(Configuration conf){ throw new RuntimeException(e); } - sqlRetryHandler = new SqlRetryHandler(conf, jdbcResource.getDatabaseProduct()); + sqlRetryHandler = new SqlRetryHandler(conf, jdbcResource.getDatabaseProduct()); + txnLockHandler = TransactionalRetryProxy.getProxy(new TxnLockHandlerImpl(jdbcResource), sqlRetryHandler, jdbcResource); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
