veghlaci05 commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1310142264


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -2575,11 +2601,17 @@ protected long getMinOpenTxnIdWaterMark(Connection 
dbConn) throws MetaException,
           minOpenTxn = Long.MAX_VALUE;
         }
       }
+    } catch (SQLException e) {
+      throw new UncategorizedSQLException(null, null, e);
+    }
+    jdbcTemplate.getTransactionWithinRetryContext(new 
DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED), 
POOL_TX);
+    try {
+      long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId();

Review Comment:
   Fixed.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -1603,21 +1645,24 @@ public void commitTxn(CommitTxnRequest rqst) throws 
NoSuchTxnException, TxnAbort
         }
 
         if (!isHiveReplTxn) {
-          createCommitNotificationEvent(dbConn, txnid , txnType);
+          createCommitNotificationEvent(jdbcTemplate, txnid , txnType);
         }
 
         LOG.debug("Going to commit");
-        dbConn.commit();
+        jdbcTemplate.commit();
 
         if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
           
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
         }
       } catch (SQLException e) {
         LOG.debug("Going to rollback: ", e);
-        rollbackDBConn(dbConn);
+        jdbcTemplate.rollback();

Review Comment:
   It's also the DataSourceWrapper object, just renamed it to jdbcWrapper 
according to one of your previos comments.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java:
##########
@@ -79,8 +79,8 @@ public class CompactionInfo implements 
Comparable<CompactionInfo> {
   public Set<Long> writeIds;
   public boolean hasUncompactedAborts;
 
-  byte[] metaInfo;
-  String hadoopJobId;
+  public byte[] metaInfo;
+  public String hadoopJobId;

Review Comment:
   I think not, with @deniskuzZ  we discussed that CompactionInfo should not be 
touched as a part of this PR.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5755,80 +5807,86 @@ private void timeOutLocks(Connection dbConn) {
    */
   @RetrySemantics.Idempotent
   public void performTimeOuts() {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
     try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      //We currently commit after selecting the TXNS to abort.  So whether 
SERIALIZABLE
-      //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE on 
Select from TXNS
-      //and do the whole performTimeOuts() in a single huge transaction, but 
the only benefit
-      //would be to make sure someone cannot heartbeat one of these txns at 
the same time.
-      //The attempt to heartbeat would block and fail immediately after it's 
unblocked.
-      //With current (RC + multiple txns) implementation it is possible for 
someone to send
-      //heartbeat at the very end of the expire interval, and just after the 
Select from TXNS
-      //is made, in which case heartbeat will succeed but txn will still be 
Aborted.
-      //Solving this corner case is not worth the perf penalty.  The client 
should heartbeat in a
-      //timely way.
-      timeOutLocks(dbConn);
-      while(true) {
-        stmt = dbConn.createStatement();
-        String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
-          " AND (" +
-                "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + timeout +
-             " OR " +
-                " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + replicationTxnTimeout +
-             ")";
-        //safety valve for extreme cases
-        s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
-        LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        if(!rs.next()) {
-          return;//no more timedout txns
-        }
-        List<List<Long>> timedOutTxns = new ArrayList<>();
-        List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-        timedOutTxns.add(currentBatch);
-        do {
-          if(currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
-            currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-            timedOutTxns.add(currentBatch);
-          }
-          currentBatch.add(rs.getLong(1));
-        } while(rs.next());
-        dbConn.commit();
-        close(rs, stmt, null);
-        int numTxnsAborted = 0;
-        for(List<Long> batchToAbort : timedOutTxns) {
-          if (abortTxns(dbConn, batchToAbort, true, false, false, 
TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
-            dbConn.commit();
-            numTxnsAborted += batchToAbort.size();
-            //todo: add TXNS.COMMENT filed and set it to 'aborted by system 
due to timeout'
-          }
-          else {
-            //could not abort all txns in this batch - this may happen because 
in parallel with this
-            //operation there was activity on one of the txns in this batch 
(commit/abort/heartbeat)
-            //This is not likely but may happen if client experiences long 
pause between heartbeats or
-            //unusually long/extreme pauses between heartbeat() calls and 
other logic in checkLock(),
-            //lock(), etc.
-            dbConn.rollback();
+      retryHandler.executeWithoutRetry(
+          new RetryCallProperties()
+              .withCallerId("performTimeOuts()")
+              .withDataSource(POOL_TX)
+              .withExceptionSupplier(e -> new MetaException("Aborting timed 
out transactions failed due to " + RetryHandler.getMessage(e))),
+          (DataSourceWrapper dataSourceWrapper) -> {

Review Comment:
   done



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -1418,13 +1453,17 @@ public void commitTxn(CommitTxnRequest rqst) throws 
NoSuchTxnException, TxnAbort
 
     boolean isReplayedReplTxn = 
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
     boolean isHiveReplTxn = rqst.isSetReplPolicy() && 
TxnType.DEFAULT.equals(rqst.getTxn_type());
+    TransactionStatus status = null;
     try {
+      //start a new transaction
+      status = jdbcTemplate.getTransactionWithinRetryContext(new 
DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED), 
POOL_TX);

Review Comment:
   You are absolutely right on this one. The reason for this mixing approach is 
the following: To reduce the size of the PR (it's already huge) I split it to 
CompactionTxnHandler and TxnHandler cleanup. Unfortunately there are situations 
where a TxnHandler method calls a CompactionTxnHandler one or vice versa. In 
these cases there were three options:
   
   - Skip these methods from the redesign for now. I dropped this aproach as it 
would lead to leaving almost half of CompactionTxnHandler untouched. So those 
changes would make the second PR larger.
   - Redesign the necessary TxnHandler methods as well. I dropped this approach 
as it would lead to redesigning at least 1/3 of TxnHandler in this PR.
   - My chosen approach: In the affected TxnHandler methods replace the 
getConnection and commit/rollback calls only. As a result the RetryContext is 
established and CompcationTxnHandler methods can be called without passing the 
Connection object. On the other hand, changes in TxnHandler are very limited, 
the rest of these methods are untouched.
   
   Of course this mixed approach is bad and therefore must be temporary. 
However, I still thought this is the best compromise from the possibilities. I 
already started removing them in the TxnHandler cleanup PR.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5755,80 +5807,86 @@ private void timeOutLocks(Connection dbConn) {
    */
   @RetrySemantics.Idempotent
   public void performTimeOuts() {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
     try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      //We currently commit after selecting the TXNS to abort.  So whether 
SERIALIZABLE
-      //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE on 
Select from TXNS
-      //and do the whole performTimeOuts() in a single huge transaction, but 
the only benefit
-      //would be to make sure someone cannot heartbeat one of these txns at 
the same time.
-      //The attempt to heartbeat would block and fail immediately after it's 
unblocked.
-      //With current (RC + multiple txns) implementation it is possible for 
someone to send
-      //heartbeat at the very end of the expire interval, and just after the 
Select from TXNS
-      //is made, in which case heartbeat will succeed but txn will still be 
Aborted.
-      //Solving this corner case is not worth the perf penalty.  The client 
should heartbeat in a
-      //timely way.
-      timeOutLocks(dbConn);
-      while(true) {
-        stmt = dbConn.createStatement();
-        String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
-          " AND (" +
-                "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + timeout +
-             " OR " +
-                " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + replicationTxnTimeout +
-             ")";
-        //safety valve for extreme cases
-        s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
-        LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        if(!rs.next()) {
-          return;//no more timedout txns
-        }
-        List<List<Long>> timedOutTxns = new ArrayList<>();
-        List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-        timedOutTxns.add(currentBatch);
-        do {
-          if(currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
-            currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-            timedOutTxns.add(currentBatch);
-          }
-          currentBatch.add(rs.getLong(1));
-        } while(rs.next());
-        dbConn.commit();
-        close(rs, stmt, null);
-        int numTxnsAborted = 0;
-        for(List<Long> batchToAbort : timedOutTxns) {
-          if (abortTxns(dbConn, batchToAbort, true, false, false, 
TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
-            dbConn.commit();
-            numTxnsAborted += batchToAbort.size();
-            //todo: add TXNS.COMMENT filed and set it to 'aborted by system 
due to timeout'
-          }
-          else {
-            //could not abort all txns in this batch - this may happen because 
in parallel with this
-            //operation there was activity on one of the txns in this batch 
(commit/abort/heartbeat)
-            //This is not likely but may happen if client experiences long 
pause between heartbeats or
-            //unusually long/extreme pauses between heartbeat() calls and 
other logic in checkLock(),
-            //lock(), etc.
-            dbConn.rollback();
+      retryHandler.executeWithoutRetry(
+          new RetryCallProperties()
+              .withCallerId("performTimeOuts()")
+              .withDataSource(POOL_TX)
+              .withExceptionSupplier(e -> new MetaException("Aborting timed 
out transactions failed due to " + RetryHandler.getMessage(e))),
+          (DataSourceWrapper dataSourceWrapper) -> {
+            //We currently commit after selecting the TXNS to abort.  So 
whether SERIALIZABLE
+            //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE 
on Select from TXNS
+            //and do the whole performTimeOuts() in a single huge transaction, 
but the only benefit
+            //would be to make sure someone cannot heartbeat one of these txns 
at the same time.
+            //The attempt to heartbeat would block and fail immediately after 
it's unblocked.
+            //With current (RC + multiple txns) implementation it is possible 
for someone to send
+            //heartbeat at the very end of the expiry interval, and just after 
the Select from TXNS
+            //is made, in which case heartbeat will succeed but txn will still 
be Aborted.
+            //Solving this corner case is not worth the perf penalty.  The 
client should heartbeat in a
+            //timely way.
+            timeOutLocks();
+            while (true) {
+              String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
+                  " AND (" +
+                  "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
+                  " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + timeout +
+                  " OR " +
+                  " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
+                  " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + replicationTxnTimeout +
+                  ")";
+              //safety valve for extreme cases
+              s = sqlGenerator.addLimitClause(10 * 
TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
+
+              LOG.debug("Going to execute query <{}>", s);
+              List<List<Long>> timedOutTxns = 
dataSourceWrapper.getJdbcTemplate().query(s, rs -> {
+                List<List<Long>> txnbatch = new ArrayList<>();
+                List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+                while (rs.next()) {
+                  currentBatch.add(rs.getLong(1));
+                  if (currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
+                    txnbatch.add(currentBatch);
+                    currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+                  }
+                }
+                if (currentBatch.size() > 0) {
+                  txnbatch.add(currentBatch);
+                }
+                return txnbatch;
+              });
+              //noinspection DataFlowIssue
+              if (timedOutTxns.size() == 0) {
+                return Void.TYPE;
+              }
+
+              TransactionStatus status = 
dataSourceWrapper.getTransactionStatus();
+              Object savePoint = status.createSavepoint();
+
+              int numTxnsAborted = 0;
+              for (List<Long> batchToAbort : timedOutTxns) {
+                status.releaseSavepoint(savePoint);
+                savePoint = status.createSavepoint();

Review Comment:
   This is just the jdbcTemplate equivalent of the old code, it had 
commit/rollback calls inside the loop. We can enhance it but I would do it in a 
separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to