veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1418915844


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/StackThreadLocal.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import java.util.Stack;
+
+public class StackThreadLocal<T> {
+
+  private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<>();
+
+  public void set(T value) {
+    Stack<T> stack = threadLocal.get();
+    if (stack == null) {
+      stack = new Stack<>();

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnLockHandler.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.txn.retryhandling.SqlRetry;
+import org.springframework.transaction.annotation.Transactional;
+
+import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_TX;
+
+public interface TxnLockHandler {

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java:
##########
@@ -149,15 +164,23 @@ public Object invoke(Object proxy, Method method, 
Object[] args) throws Throwabl
           LOG.debug("Successfull method invocation within retry context: {}", 
callerId);
           return result;
         } catch (IllegalAccessException | InvocationTargetException | 
UndeclaredThrowableException e) {
-          if (e.getCause() instanceof MetaException) {
-            throw (MetaException) e.getCause();
+          if (e.getCause() instanceof TException) {
+            throw (TException) e.getCause();
           } else if (e.getCause() instanceof RuntimeException) {
             throw (RuntimeException) e.getCause();
           } else {
             throw new RuntimeException(e);
           }
-        } catch (Throwable e) {
-          throw new RuntimeException(e);
+        } catch (TException | DataAccessException e) {

Review Comment:
   I think this block is specific to this class. Can't recall similar blocks 
elsewhere.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java:
##########
@@ -712,6 +782,7 @@ Set<CompactionInfo> findPotentialCompactions(int 
abortedThreshold, long abortedT
    * WriteSet tracking is used to ensure proper transaction isolation.  This 
method deletes the
    * transaction metadata once it becomes unnecessary.
    */
+  @Transactional(POOL_TX)

Review Comment:
   Doublechecked, and the txn pool was used for this. However I can change it.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java:
##########
@@ -666,4 +627,60 @@ public static Character 
thriftCompactionType2DbType(CompactionType ct) throws Me
   public static String nvl(String input) {
     return input != null ? " = ? " : " IS NULL ";
   }
+
+  public static String normalizePartitionCase(String s) {

Review Comment:
   `PartitionUtils` is in hive-exec which is not a dependency of 
metastore-server.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/HiveMutex.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.impl;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
+import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext;
+import org.apache.hadoop.hive.metastore.txn.retryhandling.SqlRetryHandler;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+
+import java.sql.SQLException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX;
+import static 
org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED;
+
+public class HiveMutex implements TxnStore.MutexAPI {

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/LockInfo.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.entities;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.LockTypeUtil;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class LockInfo {

Review Comment:
   Yeah good idea, will move them



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/commands/AddWriteIdsToMinHistoryCommand.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.impl.commands;

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/LockHandleImpl.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.impl;
+
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
+import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+
+import java.util.concurrent.Semaphore;
+
+import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX;
+
+public final class LockHandleImpl implements TxnStore.MutexAPI.LockHandle {

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/ConditionalCommand.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.jdbc;
+
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
+
+/**
+ * {@link ParameterizedCommand}. {@link ParameterizedBatchCommand}, and {@link 
InClauseBatchCommand} implementations can also
+ * implement this interface, marking that the execution is conditial, and the 
command wants to get notified about 
+ * execution errors. Can be used to implement commands depending on some 
feature flag(s).  
+ */
+public interface ConditionalCommand {
+
+  /**
+   * Indicates if the command should be executed or not
+   * @param databaseProduct
+   * @return Returns true if the command can be executed, false otherwise.
+   */
+  boolean shouldBeUsed(DatabaseProduct databaseProduct);

Review Comment:
   
https://github.com/apache/hive/pull/4566/files#diff-99f1624b0fe7acb8f8919dd47ae38747c1abdf98590fe96962cc1bd1bc3fe627R341



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/ProgrammaticRollbackException.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.jdbc;
+
+import java.lang.reflect.Method;
+
+/**
+ * This exception can be used to trigger rollback in 
+ * {@link 
org.apache.hadoop.hive.metastore.txn.TransactionalRetryProxy#invoke(Object, 
Method, Object[])}
+ * for the current transaction, without propagating the exception to the 
caller. The proxy will catch this exception,
+ * rollback the transaction (if not yet completed already) and return the 
value supplied in the constructor to te caller.
+ */
+public class ProgrammaticRollbackException extends RuntimeException {  

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/impl/functions/HeartBeatLockFunction.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.impl.functions;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
+import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+
+import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
+
+/**
+ * Heartbeats on the lock table.  This commits, so do not enter it with any 
state.
+ * Should not be called on a lock that belongs to transaction.
+ */
+public class HeartBeatLockFunction implements TransactionalFunction<Void> {

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java:
##########
@@ -546,41 +548,42 @@ public boolean supportsGetGeneratedKeys() throws 
MetaException {
     }
   }
 
-  public boolean isDuplicateKeyError(SQLException ex) {
+  public boolean isDuplicateKeyError(Throwable ex) {

Review Comment:
   done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -648,260 +406,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) 
throws MetaException {
       throw new MetaException("Invalid input for number of txns: " + numTxns);
     }
 
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        /*
-         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
-         * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
-         * One way to do it, would be to serialize the openTxns call with a 
S4U lock, but that would cause
-         * performance degradation with high transaction load.
-         * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
-         * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
-         * the period.
-         * Example: At t[0] there is one open transaction in the TXNS table, 
T[1].
-         * T[2] acquires the next sequence at t[1] but only commits into the 
TXNS table at t[10].
-         * T[3] acquires its sequence at t[2], and commits into the TXNS table 
at t[3].
-         * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also 
T[2] in the snapshot’s
-         * open transaction list. T[1] because it is presented as open in TXNS,
-         * T[2] because it is a missing sequence.
-         *
-         * In the current design, there can be several metastore instances 
running in a given Warehouse.
-         * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
-         * a client may go to MS1 and start a transaction with ID 500 to 
update a particular row.
-         * Now the same client will start another transaction, except it ends 
up on MS2 and may get
-         * transaction ID 400 and update the same row.  Now the merge that 
happens to materialize the snapshot
-         * on read will thing the version of the row from transaction ID 500 
is the latest one.
-         *
-         * Longer term we can consider running Active-Passive MS (at least wrt 
to ACID operations).  This
-         * set could support a write-through cache for added performance.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        /*
-         * The openTxn and commitTxn must be mutexed, when committing a not 
read only transaction.
-         * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
-         * Since table locks are working in Derby, we don't need the 
lockInternal call here.
-         * Example: Suppose we have two transactions with update like x = x+1.
-         * We have T[3,3] that was using a value from a snapshot with T[2,2]. 
If we allow committing T[3,3]
-         * and opening T[4] parallel it is possible, that T[4] will be using 
the value from a snapshot with T[2,2],
-         * and we will have a lost update problem
-         */
-        acquireTxnLock(stmt, true);
-        // Measure the time from acquiring the sequence value, till committing 
in the TXNS table
-        StopWatch generateTransactionWatch = new StopWatch();
-        generateTransactionWatch.start();
-
-        List<Long> txnIds = openTxns(dbConn, rqst);
+    /*
+     * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
+     * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
+     * One way to do it, would be to serialize the openTxns call with a S4U 
lock, but that would cause
+     * performance degradation with high transaction load.
+     * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
+     * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
+     * the period.
+     * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+     * T[2] acquires the next sequence at t[1] but only commits into the TXNS 
table at t[10].
+     * T[3] acquires its sequence at t[2], and commits into the TXNS table at 
t[3].
+     * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] 
in the snapshot’s
+     * open transaction list. T[1] because it is presented as open in TXNS,
+     * T[2] because it is a missing sequence.
+     *
+     * In the current design, there can be several metastore instances running 
in a given Warehouse.
+     * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
+     * a client may go to MS1 and start a transaction with ID 500 to update a 
particular row.
+     * Now the same client will start another transaction, except it ends up 
on MS2 and may get
+     * transaction ID 400 and update the same row.  Now the merge that happens 
to materialize the snapshot
+     * on read will thing the version of the row from transaction ID 500 is 
the latest one.
+     *
+     * Longer term we can consider running Active-Passive MS (at least wrt to 
ACID operations).  This
+     * set could support a write-through cache for added performance.
+     */
+    /*
+     * The openTxn and commitTxn must be mutexed, when committing a not read 
only transaction.
+     * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
+     * Since table locks are working in Derby, we don't need the lockInternal 
call here.
+     * Example: Suppose we have two transactions with update like x = x+1.
+     * We have T[3,3] that was using a value from a snapshot with T[2,2]. If 
we allow committing T[3,3]
+     * and opening T[4] parallel it is possible, that T[4] will be using the 
value from a snapshot with T[2,2],
+     * and we will have a lost update problem
+     */
+    acquireTxnLock(true);
+    // Measure the time from acquiring the sequence value, till committing in 
the TXNS table

Review Comment:
   No, this already existed



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -514,112 +365,19 @@ public Configuration getConf() {
   @Override
   @RetrySemantics.ReadOnly
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
-    return getOpenTxnsList(true).toOpenTxnsInfoResponse();
+    return jdbcResource.execute(new GetOpenTxnsListHandler(true, 
openTxnTimeOutMillis)).toOpenTxnsInfoResponse();
   }
 
   @Override
   @RetrySemantics.ReadOnly
   public GetOpenTxnsResponse getOpenTxns() throws MetaException {
-    return 
getOpenTxnsList(false).toOpenTxnsResponse(Arrays.asList(TxnType.READ_ONLY));
-  }
-
-  private GetOpenTxnsResponse getOpenTxns(Connection dbConn) throws 
MetaException {
-    return getOpenTxnsList(false, 
dbConn).toOpenTxnsResponse(Arrays.asList(TxnType.READ_ONLY));
+    return jdbcResource.execute(new GetOpenTxnsListHandler(false, 
openTxnTimeOutMillis)).toOpenTxnsResponse(Collections.singletonList(TxnType.READ_ONLY));

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -470,28 +320,29 @@ private boolean checkIfTableIsUsable(String tableName, 
boolean configValue) thro
       // don't check it if disabled
       return false;
     }
-    Connection dbConn = null;
-    boolean tableExists = true;
+    jdbcResource.bindDataSource(POOL_TX);
     try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      try (Statement stmt = dbConn.createStatement()) {
-        // Dummy query to see if table exists
-        try (ResultSet rs = stmt.executeQuery("SELECT 1 FROM \"" + tableName + 
"\"")) {
-          rs.next();
-        }
-      }
-    } catch (SQLException e) {
+      jdbcResource.getJdbcTemplate().query("SELECT 1 FROM \"" + tableName + 
"\"", new MapSqlParameterSource(),
+          rs -> {
+            if (rs.next()) {

Review Comment:
   Yeah, I just modified it to use `JdbcTemplate` without analyzing it deeper. 
The result doesn't matter so I replaced the lambda with `ResultSet::next`



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnLockHandlerImpl.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.txn.entities.LockInfo;
+import org.apache.hadoop.hive.metastore.txn.impl.functions.CheckLockFunction;
+import org.apache.hadoop.hive.metastore.txn.impl.functions.EnqueueLockFunction;
+import org.apache.hadoop.hive.metastore.txn.impl.queries.GetLocksByLockId;
+import org.apache.hadoop.hive.metastore.txn.impl.queries.ShowLocksHandler;
+import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
+import org.apache.hadoop.hive.metastore.txn.jdbc.ProgrammaticRollbackException;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+
+import java.sql.Types;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.txn.entities.LockInfo.LOCK_WAITING;
+
+public class TxnLockHandlerImpl implements TxnLockHandler {

Review Comment:
   Done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -248,199 +181,115 @@
  * the metstore call stack should have logic not to retry.  There are {@link 
RetrySemantics}
  * annotations to document the behavior.
  */
+@SuppressWarnings("SqlSourceToSinkFlow")
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   
-  private static final String TXN_TMP_STATE = "_";
-  private static final String DEFAULT_POOL_NAME = "default";
-
-  // Lock states
-  static final protected char LOCK_ACQUIRED = 'a';
-  static final protected char LOCK_WAITING = 'w';
-
-  private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
   private static final Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
 
-  private static DataSource connPool;
-  private static DataSource connPoolMutex;
-
-  private static final String MANUAL_RETRY = "ManualRetry";
-
-  // Query definitions
-  private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO 
\"HIVE_LOCKS\" ( " +
-      "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", 
\"HL_TABLE\", \"HL_PARTITION\", " +
-      "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", 
\"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
-      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
-  private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO 
\"TXN_COMPONENTS\" (" +
-      "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", 
\"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
-      " VALUES (?, ?, ?, ?, ?, ?)";
-  private static final String TXN_COMPONENTS_DP_DELETE_QUERY = "DELETE FROM 
\"TXN_COMPONENTS\" " +
-      "WHERE \"TC_TXNID\" = ? AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? AND 
\"TC_PARTITION\" IS NULL";
-  private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE 
\"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %s";
-  private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE 
\"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s " +
-      "WHERE \"HL_LOCK_EXT_ID\" = %s";
-  private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" 
FROM \"TXN_TO_WRITE_ID\" WHERE" +
-      " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
-  private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO 
\"COMPLETED_TXN_COMPONENTS\" " +
-      "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", 
\"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
-      " VALUES (%s, ?, ?, ?, ?, %s)";
-  private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " +
-      "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", 
\"TXN_HOST\", \"TXN_TYPE\") " +
-      "VALUES(?,%s,%s,?,?,?)";
-  private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT 
\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " +
-      "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", 
\"HL_LOCK_TYPE\", \"HL_TXNID\" " +
-      "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
-  private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT 
\"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
-      "WHERE \"HL_LAST_HEARTBEAT\" < %s - :timeout AND \"HL_TXNID\" = 0";
-  private static final String TXN_TO_WRITE_ID_INSERT_QUERY = "INSERT INTO 
\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " +
-      "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)";
-  private static final String MIN_HISTORY_WRITE_ID_INSERT_QUERY = "INSERT INTO 
\"MIN_HISTORY_WRITE_ID\" (\"MH_TXNID\", " +
-      "\"MH_DATABASE\", \"MH_TABLE\", \"MH_WRITEID\") VALUES (?, ?, ?, ?)";
-  private static final String SELECT_NWI_NEXT_FROM_NEXT_WRITE_ID =
-      "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? 
AND \"NWI_TABLE\" = ?";
-  private static final String SELECT_METRICS_INFO_QUERY =
-      "SELECT * FROM (SELECT COUNT(*) FROM \"TXN_TO_WRITE_ID\") \"TTWID\" 
CROSS JOIN (" +
-      "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\") \"CTC\" CROSS JOIN 
(" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.OPEN + "' AND \"TXN_TYPE\" = "+ 
TxnType.REPL_CREATED.getValue() +") \"TR\" CROSS JOIN (" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.OPEN + "' AND \"TXN_TYPE\" != "+ 
TxnType.REPL_CREATED.getValue() +") \"T\" CROSS JOIN (" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.ABORTED + "') \"A\" CROSS JOIN (" +
-      "SELECT COUNT(*), ({0} - MIN(\"HL_ACQUIRED_AT\"))/1000 FROM 
\"HIVE_LOCKS\") \"HL\" CROSS JOIN (" +
-      "SELECT ({0} - MIN(\"CQ_COMMIT_TIME\"))/1000 from \"COMPACTION_QUEUE\" 
WHERE " +
-          "\"CQ_STATE\"=''" + READY_FOR_CLEANING + "'') OLDEST_CLEAN";
-  private static final String SELECT_TABLES_WITH_X_ABORTED_TXNS =
-      "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM 
\"TXN_COMPONENTS\" " +
-          "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE 
\"TXN_STATE\" = " + TxnStatus.ABORTED +
-      " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING 
COUNT(\"TXN_ID\") > ?";
-  
-  private static final String  EXCL_CTAS_ERR_MSG = 
-      "Failed to initiate a concurrent CTAS operation with the same table 
name, lockInfo : %s";
-  private static final String ZERO_WAIT_READ_ERR_MSG = "Unable to acquire read 
lock due to an existing exclusive lock {%s}";
-
-
-  protected List<TransactionalMetaStoreEventListener> transactionalListeners;
-
   // Maximum number of open transactions that's allowed
   private static volatile int maxOpenTxns = 0;
   // Whether number of open transactions reaches the threshold
   private static volatile boolean tooManyOpenTxns = false;
+  // Current number of open txns
+  private static AtomicInteger numOpenTxns;
+
+  private static volatile boolean initialized = false;
+  private static DataSource connPool;
+  private static DataSource connPoolMutex;
+  protected static DataSource connPoolCompactor;
+
+  protected static DatabaseProduct dbProduct;
+  protected static SQLGenerator sqlGenerator;
+  protected static long openTxnTimeOutMillis;
 
   /**
    * Number of consecutive deadlocks we have seen
    */
-  private int deadlockCnt;
-  private long deadlockRetryInterval;
   protected Configuration conf;
-  protected static DatabaseProduct dbProduct;
-  protected static SQLGenerator sqlGenerator;
-  private static long openTxnTimeOutMillis;
 
+  protected List<TransactionalMetaStoreEventListener> transactionalListeners;
   // (End user) Transaction timeout, in milliseconds.
   private long timeout;
   private long replicationTxnTimeout;
 
-  private int maxBatchSize;
-  private String identifierQuoteString; // quotes to use for quoting tables, 
where necessary
-  private long retryInterval;
-  private int retryLimit;
-  private int retryNum;
-  // Current number of open txns
-  private AtomicInteger numOpenTxns;
-  // Whether to use min_history_level table or not.
-  // At startup we read it from the config, but set it to false if 
min_history_level does nto exists.
-  static boolean useMinHistoryLevel;
-  static boolean useMinHistoryWriteId;
-
-  private static SqlRetryHandler sqlRetryHandler;
-  protected static MultiDataSourceJdbcResource jdbcResource;
+  private MutexAPI mutexAPI;
+  private TxnLockHandler txnLockHandler;
+  private SqlRetryHandler sqlRetryHandler;
+  protected MultiDataSourceJdbcResource jdbcResource;
 
-  /**
-   * Derby specific concurrency control
-   */
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-  /**
-   * must be static since even in UT there may be > 1 instance of TxnHandler
-   * (e.g. via Compactor services)
-   */
-  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = 
new ConcurrentHashMap<>();
   private static final String hostname = JavaUtils.hostname();
 
-  // Private methods should never catch SQLException and then throw 
MetaException.  The public
-  // methods depend on SQLException coming back so they can detect and handle 
deadlocks.  Private
-  // methods should only throw MetaException when they explicitly know there's 
a logic error and
-  // they want to throw past the public methods.
-  //
-  // All public methods that write to the database have to check for deadlocks 
when a SQLException
-  // comes back and handle it if they see one.  This has to be done with the 
connection pooling
-  // in mind.  To do this they should call checkRetryable() AFTER rolling back 
the db transaction,
-  // and then they should catch RetryException and call themselves 
recursively. See commitTxn for an example.
-
   public TxnHandler() {
   }
 
   /**
    * This is logically part of c'tor and must be called prior to any other 
method.
    * Not physically part of c'tor due to use of reflection
    */
-  public void setConf(Configuration conf){
+  public void setConf(Configuration conf) {
     this.conf = conf;
 
-    int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-    synchronized (TxnHandler.class) {
-      try (DataSourceProvider.DataSourceNameConfigurator configurator =
-               new DataSourceProvider.DataSourceNameConfigurator(conf, 
POOL_TX)) {
-        if (connPool == null) {
-          connPool = setupJdbcConnectionPool(conf, maxPoolSize);
-        }
-        if (connPoolMutex == null) {
-          configurator.resetName(POOL_MUTEX);
-          connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
-        }
-      }
-      if (dbProduct == null) {
-        try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
-          determineDatabaseProduct(dbConn);
-        } catch (SQLException e) {
-          LOG.error("Unable to determine database product", e);
-          throw new RuntimeException(e);
+    if (!initialized) {
+      synchronized (TxnHandler.class) {
+        if (!initialized) {
+          try (DataSourceProvider.DataSourceNameConfigurator configurator =
+                   new DataSourceProvider.DataSourceNameConfigurator(conf, 
POOL_TX)) {
+            int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+            if (connPool == null) {
+              connPool = setupJdbcConnectionPool(conf, maxPoolSize);
+            }
+            if (connPoolMutex == null) {
+              configurator.resetName(POOL_MUTEX);
+              connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
+            }
+            if (connPoolCompactor == null) {
+              configurator.resetName(POOL_COMPACTOR);
+              connPoolCompactor = setupJdbcConnectionPool(conf,
+                  MetastoreConf.getIntVar(conf, 
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS));
+            }            
+          }
+          if (dbProduct == null) {
+            try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPool)) {
+              determineDatabaseProduct(dbConn);
+            } catch (SQLException e) {
+              LOG.error("Unable to determine database product", e);
+              throw new RuntimeException(e);
+            }
+          }
+          if (sqlGenerator == null) {
+            sqlGenerator = new SQLGenerator(dbProduct, conf);
+          }
+          initialized = true;
         }
       }
+    }
 
-      if (sqlGenerator == null) {
-        sqlGenerator = new SQLGenerator(dbProduct, conf);
-      }
-      
-      if (jdbcResource == null) {
-        jdbcResource = new MultiDataSourceJdbcResource(dbProduct);
-        jdbcResource.registerDataSource(POOL_TX, connPool);
-        jdbcResource.registerDataSource(POOL_MUTEX, connPoolMutex);
-      }      
+    if (jdbcResource == null) {
+      jdbcResource = new MultiDataSourceJdbcResource(dbProduct, conf, 
sqlGenerator);
+      jdbcResource.registerDataSource(POOL_TX, connPool);
+      jdbcResource.registerDataSource(POOL_MUTEX, connPoolMutex);
+      jdbcResource.registerDataSource(POOL_COMPACTOR, connPoolCompactor);
     }
+    
+    mutexAPI = new HiveMutex(sqlGenerator, jdbcResource);
 
     numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS);
 
     timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS);
     replicationTxnTimeout = MetastoreConf.getTimeVar(conf, 
ConfVars.REPL_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
-    retryInterval = MetastoreConf.getTimeVar(conf, 
ConfVars.HMS_HANDLER_INTERVAL,
-        TimeUnit.MILLISECONDS);
-    retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
-    deadlockRetryInterval = retryInterval / 10;
     maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
-    maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
-
     openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, 
ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS);
     
     try {
-      useMinHistoryWriteId = checkIfTableIsUsable("MIN_HISTORY_WRITE_ID", 
-        MetastoreConf.getBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID));
-      
-      // override the config if table does not exists anymore
+      
TxnHandlingFeatures.setUseMinHistoryWriteId(checkIfTableIsUsable("MIN_HISTORY_WRITE_ID",
 
+              MetastoreConf.getBoolVar(conf, 
ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID))

Review Comment:
   Did the formatting. Moving it would require to pass `jdbcResource` as well, 
I think it is easier to set up the fields as a part of TxnHandler init.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retryhandling/SqlRetryHandler.java:
##########
@@ -123,32 +124,29 @@ public <Result> Result 
executeWithRetry(SqlRetryCallProperties properties, SqlRe
       if (properties.isLockInternally()) {
         lockInternal();
       }
-      threadLocal.set(new ContextNode<>(threadLocal.get(), new Object()));
+      threadLocal.set(new Object());

Review Comment:
   In `SqlRetryHandler` the context doesn't hold any information. It's just a 
marker if we already have an established context or not. The reason it's not a 
boolean flag that theoretically there can be any level of nesting which we need 
to keep track of.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -457,7 +306,8 @@ public void setConf(Configuration conf){
       throw new RuntimeException(e);
     }
 
-    sqlRetryHandler = new SqlRetryHandler(conf, 
jdbcResource.getDatabaseProduct());    
+    sqlRetryHandler = new SqlRetryHandler(conf, 
jdbcResource.getDatabaseProduct());
+    txnLockHandler = TransactionalRetryProxy.getProxy(new 
TxnLockHandlerImpl(jdbcResource), sqlRetryHandler, jdbcResource);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to