This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 764c0be4788 HIVE-29434: CommitTxnFunction refactor (#6290)
764c0be4788 is described below

commit 764c0be478874f593a9f8bf39bb784028e883ff4
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Feb 24 11:01:00 2026 +0200

    HIVE-29434: CommitTxnFunction refactor (#6290)
---
 .../txn/jdbc/functions/CommitTxnFunction.java      | 675 ++++++++++++---------
 1 file changed, 373 insertions(+), 302 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
index 661f7b37e6e..522ce558df5 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.metastore.txn.jdbc.functions;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
 import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
@@ -60,21 +59,22 @@
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 import java.sql.ResultSet;
-import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
 import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars;
 import static 
org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
@@ -84,6 +84,9 @@ public class CommitTxnFunction implements 
TransactionalFunction<TxnType> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CommitTxnFunction.class);
 
+  private static final EnumSet<TxnType> WRITE_SET_SKIP_TXN_TYPES =
+      EnumSet.of(TxnType.READ_ONLY, TxnType.SOFT_DELETE, TxnType.COMPACTION);
+
   private final CommitTxnRequest rqst;
   private final List<TransactionalMetaStoreEventListener> 
transactionalListeners;
 
@@ -94,23 +97,17 @@ public CommitTxnFunction(CommitTxnRequest rqst, 
List<TransactionalMetaStoreEvent
 
   @Override
   public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws 
MetaException, NoSuchTxnException, TxnAbortedException {
-    char isUpdateDelete = 'N';
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
 
-    boolean isReplayedReplTxn = 
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
-    boolean isHiveReplTxn = rqst.isSetReplPolicy() && 
TxnType.DEFAULT.equals(rqst.getTxn_type());
+    boolean isReplayedReplTxn = TxnType.REPL_CREATED == rqst.getTxn_type();
+    boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT == 
rqst.getTxn_type();
     //Find the write details for this transaction.
     //Doing it here before the metadata tables are updated below.
-    List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
-
-    if (!isHiveReplTxn) {
-      txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
+    List<TxnWriteDetails> txnWriteDetails = loadTxnWriteDetails(jdbcResource, 
isHiveReplTxn, txnid);
 
-    }
     // Get the current TXN
     TransactionContext context = 
jdbcResource.getTransactionManager().getActiveTransaction();
-    Long commitId = null;
 
     if (rqst.isSetReplLastIdInfo()) {
       updateReplId(jdbcResource, rqst.getReplLastIdInfo());
@@ -118,145 +115,99 @@ public TxnType execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExce
 
     if (isReplayedReplTxn) {
       assert (rqst.isSetReplPolicy());
+      txnid = resolveTargetTxnId(jdbcResource, rqst.getTxnid());
       sourceTxnId = rqst.getTxnid();
-      List<Long> targetTxnIds = jdbcResource.execute(new 
TargetTxnIdListHandler(rqst.getReplPolicy(), 
Collections.singletonList(sourceTxnId)));
-      if (targetTxnIds.isEmpty()) {
-        // Idempotent case where txn was already closed or commit txn event 
received without
-        // corresponding open txn event.
-        LOG.info("Target txn id is missing for source txn id : {} and repl 
policy {}", sourceTxnId,
-            rqst.getReplPolicy());
-        throw new RollbackException(null);
-      }
-      assert targetTxnIds.size() == 1;
-      txnid = targetTxnIds.getFirst();
     }
 
-    /**
-     * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures 
that no other
-     * operation can change this txn (such acquiring locks). While lock() and 
commitTxn()
-     * should not normally run concurrently (for same txn) but could due to 
bugs in the client
-     * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
-     */
-    TxnType txnType = jdbcResource.execute(new 
GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), txnid));
+    TxnType txnType = getOpenTxnTypeAndLock(jdbcResource, txnid, 
isReplayedReplTxn);
     if (txnType == null) {
-      //if here, txn was not found (in expected state)
-      TxnStatus actualTxnStatus = jdbcResource.execute(new 
FindTxnStateHandler(txnid));
-      if (actualTxnStatus == TxnStatus.COMMITTED) {
-        if (isReplayedReplTxn) {
-          // in case of replication, idempotent is taken care by getTargetTxnId
-          LOG.warn("Invalid state COMMITTED for transactions started using 
replication replay task");
-        }
-        /**
-         * This makes the operation idempotent
-         * (assume that this is most likely due to retry logic)
-         */
-        LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
-        return null;
-      }
-      TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+      return null;
     }
 
-    String conflictSQLSuffix = String.format("""
-        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
-        """, OperationType.UPDATE, OperationType.DELETE);
-    long tempCommitId = TxnUtils.generateTemporaryId();
+    CommitInfo commitInfo = prepareWriteSetAndCheckConflicts(
+        jdbcResource, context,
+        txnid, txnType, isReplayedReplTxn);
 
-    if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
-      if (!ConfVars.useMinHistoryWriteId()) {
-        new AcquireTxnLockFunction(false).execute(jdbcResource);
-      }
-      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+    handleCompletedTxnComponents(
+        jdbcResource,
+        txnid, txnType, commitInfo.opFlag(), isReplayedReplTxn, sourceTxnId);
 
-    } else if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) {
-      String writeSetInsertSql = """
-          INSERT INTO "WRITE_SET"
-            ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID", 
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
-          SELECT DISTINCT
-            "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
-            :commitId,
-            "TC_OPERATION_TYPE"
-          """;
-
-      boolean isUpdateOrDelete = 
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
-          jdbcResource.getSqlGenerator()
-              .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
-          new MapSqlParameterSource()
-              .addValue("txnId", txnid),
-          ResultSet::next));
-      
-      if (isUpdateOrDelete) {
-        isUpdateDelete = 'Y';
-        //if here it means currently committing txn performed update/delete 
and we should check WW conflict
-        /**
-         * "select distinct" is used below because
-         * 1. once we get to multi-statement txns, we only care to record that 
something was updated once
-         * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is 
retried by caller it may create
-         *  duplicate entries in TXN_COMPONENTS
-         * but we want to add a PK on WRITE_SET which won't have unique rows 
w/o this distinct
-         * even if it includes all of its columns
-         *
-         * First insert into write_set using a temporary commitID, which will 
be updated in a separate call,
-         * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, 
TxnType, Long, long)}}.
-         * This should decrease the scope of the S4U lock on the next_txn_id 
table.
-         */
-        Object undoWriteSetForCurrentTxn = context.createSavepoint();
-        jdbcResource.getJdbcTemplate().update(
-            writeSetInsertSql + (ConfVars.useMinHistoryLevel() ? 
conflictSQLSuffix :
-                "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
-                    (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND 
\"TC_OPERATION_TYPE\" <> :type")),
-            new MapSqlParameterSource()
-                .addValue("txnId", txnid)
-                .addValue("type", OperationType.COMPACT.getSqlConst())
-                .addValue("commitId", tempCommitId));
+    if (rqst.isSetKeyValue()) {
+      updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
+    }
 
-        /**
-         * This S4U will mutex with other commitTxn() and openTxns().
-         * -1 below makes txn intervals look like [3,3] [4,4] if all txns are 
serial
-         * Note: it's possible to have several txns have the same commit id.  
Suppose 3 txns start
-         * at the same time and no new txns start until all 3 commit.
-         * We could've incremented the sequence for commitId as well but it 
doesn't add anything functionally.
-         */
-        new AcquireTxnLockFunction(false).execute(jdbcResource);
-        commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
-
-        if (!rqst.isExclWriteEnabled()) {
-          /**
-           * see if there are any overlapping txns that wrote the same 
element, i.e. have a conflict
-           * Since entire commit operation is mutexed wrt other start/commit 
ops,
-           * committed.ws_commit_id <= current.ws_commit_id for all txns
-           * thus if committed.ws_commit_id < current.ws_txnid, transactions 
do NOT overlap
-           * For example, [17,20] is committed, [6,80] is being committed 
right now - these overlap
-           * [17,20] committed and [21,21] committing now - these do not 
overlap.
-           * [17,18] committed and [18,19] committing now - these overlap  
(here 18 started while 17 was still running)
-           */
-          WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
-          if (info != null) {
-            //found a conflict, so let's abort the txn
-            String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId) + 
"," + info.committedCommitId + "]";
-            StringBuilder resource = new 
StringBuilder(info.database).append("/").append(info.table);
-            if (info.partition != null) {
-              resource.append('/').append(info.partition);
-            }
-            String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + 
commitId + "]" + " due to a write conflict on " + resource +
-                " committed by " + committedTxn + " " + 
info.currentOperationType + "/" + info.committedOperationType;
-            //remove WRITE_SET info for current txn since it's about to abort
-            context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
-            LOG.info(msg);
-            //todo: should make abortTxns() write something into 
TXNS.TXN_META_INFO about this
-            if (new AbortTxnsFunction(Collections.singletonList(txnid), false, 
false, 
-                isReplayedReplTxn, 
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource) != 1) {
-              throw new IllegalStateException(msg + " FAILED!");
-            }
-            throw new TxnAbortedException(msg);
-          }
-        }
-      } else if (!ConfVars.useMinHistoryLevel()) {
-        jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM 
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
-            new MapSqlParameterSource()
-                .addValue("txnId", txnid)
-                .addValue("commitId", jdbcResource.execute(new 
GetHighWaterMarkHandler())));
+    if (!isHiveReplTxn) {
+      createCommitNotificationEvent(jdbcResource, txnid , txnType, 
txnWriteDetails);
+    }
+
+    updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, 
commitInfo);
+    LOG.debug("Going to commit");
+
+    if (MetastoreConf.getBoolVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
+      
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
+    }
+    return txnType;
+  }
+
+  private List<TxnWriteDetails> 
loadTxnWriteDetails(MultiDataSourceJdbcResource jdbcResource,
+      boolean isHiveReplTxn, long txnid) throws MetaException {
+    if (isHiveReplTxn) {
+      return new ArrayList<>();
+    }
+    return jdbcResource.execute(
+        new GetWriteIdsMappingForTxnIdsHandler(Set.of(txnid)));
+  }
+
+  private Long resolveTargetTxnId(MultiDataSourceJdbcResource jdbcResource, 
long sourceTxnId)
+      throws MetaException {
+    List<Long> targetTxnIds = jdbcResource.execute(
+        new TargetTxnIdListHandler(rqst.getReplPolicy(), 
List.of(sourceTxnId)));
+    if (targetTxnIds.isEmpty()) {
+      // Idempotent case where txn was already closed or commit txn event 
received without
+      // corresponding open txn event.
+      LOG.info("Target txn id is missing for source txn id : {} and repl 
policy {}", sourceTxnId,
+          rqst.getReplPolicy());
+      throw new RollbackException(null);
+    }
+    return targetTxnIds.getFirst();
+  }
+
+  /**
+   * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures 
that no other
+   * operation can change this txn (such acquiring locks). While lock() and 
commitTxn()
+   * should not normally run concurrently (for same txn) but could due to bugs 
in the client
+   * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
+   */
+  private TxnType getOpenTxnTypeAndLock(MultiDataSourceJdbcResource 
jdbcResource, long txnid,
+      boolean isReplayedReplTxn) throws MetaException, NoSuchTxnException, 
TxnAbortedException {
+    TxnType txnType = jdbcResource.execute(
+        new GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), 
txnid));
+    if (txnType != null) {
+      return txnType;
+    }
+    //if here, txn was not found (in expected state)
+    TxnStatus actualTxnStatus = jdbcResource.execute(new 
FindTxnStateHandler(txnid));
+    if (actualTxnStatus == TxnStatus.COMMITTED) {
+      if (isReplayedReplTxn) {
+        // in case of replication, idempotent is taken care by getTargetTxnId
+        LOG.warn("Invalid state COMMITTED for transactions started using 
replication replay task");
       }
-    } else {
+      /**
+       * This makes the operation idempotent
+       * (assume that this is most likely due to retry logic)
+       */
+      LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
+      return null;
+    }
+    TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+    return null;
+  }
+
+  private CommitInfo 
prepareWriteSetAndCheckConflicts(MultiDataSourceJdbcResource jdbcResource,
+      TransactionContext context, long txnid, TxnType txnType, boolean 
isReplayedReplTxn)
+      throws MetaException, TxnAbortedException {
+
+    if (WRITE_SET_SKIP_TXN_TYPES.contains(txnType) || isReplayedReplTxn) {
       /*
        * current txn didn't update/delete anything (may have inserted), so 
just proceed with commit
        *
@@ -267,36 +218,197 @@ public TxnType execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExce
        * If RO < W, then there is no reads-from relationship.
        * In replication flow we don't expect any write write conflict as it 
should have been handled at source.
        */
-      assert true;
+      return CommitInfo.empty();
     }
 
+    String conflictSQLSuffix = String.format("""
+        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
+        """, OperationType.UPDATE, OperationType.DELETE);
+    String writeSetInsertSql = """
+        INSERT INTO "WRITE_SET"
+          ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID", 
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+        SELECT DISTINCT
+          "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+          :commitId,
+          "TC_OPERATION_TYPE"
+        """;
+
+    boolean isUpdateOrDelete = 
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
+        jdbcResource.getSqlGenerator()
+            .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid),
+        ResultSet::next));
+
+    Long commitId = null;
+    long tempCommitId = 0L;
+    
+    if (isUpdateOrDelete) {
+      tempCommitId = TxnUtils.generateTemporaryId();
+      //if here it means currently committing txn performed update/delete and 
we should check WW conflict
+      /**
+       * "select distinct" is used below because
+       * 1. once we get to multi-statement txns, we only care to record that 
something was updated once
+       * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried 
by caller it may create
+       *  duplicate entries in TXN_COMPONENTS
+       * but we want to add a PK on WRITE_SET which won't have unique rows w/o 
this distinct
+       * even if it includes all of its columns
+       *
+       * First insert into write_set using a temporary commitID, which will be 
updated in a separate call,
+       * see: {@link 
#updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource, long, TxnType, 
Long, long)}}.
+       * This should decrease the scope of the S4U lock on the next_txn_id 
table.
+       */
+      Object undoWriteSetForCurrentTxn = context.createSavepoint();
+      jdbcResource.getJdbcTemplate().update(
+          writeSetInsertSql + (ConfVars.useMinHistoryLevel() ? 
conflictSQLSuffix :
+          "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+              (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND 
\"TC_OPERATION_TYPE\" <> :type")),
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("type", OperationType.COMPACT.getSqlConst())
+              .addValue("commitId", tempCommitId));
+
+      /**
+       * This S4U will mutex with other commitTxn() and openTxns().
+       * -1 below makes txn intervals look like [3,3] [4,4] if all txns are 
serial
+       * Note: it's possible to have several txns have the same commit id.  
Suppose 3 txns start
+       * at the same time and no new txns start until all 3 commit.
+       * We could've incremented the sequence for commitId as well but it 
doesn't add anything functionally.
+       */
+      new AcquireTxnLockFunction(false).execute(jdbcResource);
+      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+
+      if (!rqst.isExclWriteEnabled()) {
+        /**
+         * see if there are any overlapping txns that wrote the same element, 
i.e. have a conflict
+         * Since entire commit operation is mutexed wrt other start/commit ops,
+         * committed.ws_commit_id <= current.ws_commit_id for all txns
+         * thus if committed.ws_commit_id < current.ws_txnid, transactions do 
NOT overlap
+         * For example, [17,20] is committed, [6,80] is being committed right 
now - these overlap
+         * [17,20] committed and [21,21] committing now - these do not overlap.
+         * [17,18] committed and [18,19] committing now - these overlap  (here 
18 started while 17 was still running)
+         */
+        WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
+        if (info != null) {
+          handleWriteConflict(jdbcResource, context, 
undoWriteSetForCurrentTxn, info, txnid, commitId,
+              isReplayedReplTxn);
+        }
+      }
+    } else if (!ConfVars.useMinHistoryLevel()) {
+      jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM 
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("commitId", jdbcResource.execute(new 
GetHighWaterMarkHandler())));
+    }
+
+    return new CommitInfo(tempCommitId, commitId, isUpdateOrDelete);
+  }
+
+  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid)
+      throws MetaException {
+    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
+        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
+        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
+        "\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", 
\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" " +
+        "FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
+        "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
+        //For partitioned table we always track writes at partition level 
(never at table)
+        //and for non partitioned - always at table level, thus the same table 
should never
+        //have entries with partition key and w/o
+        "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
+        "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " + 
//txns overlap; could replace ws_txnid
+        // with txnid, though any decent DB should infer this
+        "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has 
rows we just inserted as
+        // part of this commitTxn() op
+        "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has 
committed txns
+        //U+U and U+D and D+D is a conflict and we don't currently track I in 
WRITE_SET at all
+        //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
+        //where each does "delete X + insert X, where X is a row with the same 
PK.  This is
+        //equivalent to an update of X but won't be in conflict unless D+D is 
in conflict.
+        //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
+        //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
+        // un-serializable to allow concurrent deletes
+        "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
+        "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
+    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
+    return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid)
+            .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
+            .addValue("opDelete", OperationType.DELETE.getSqlConst()),
+        rs -> rs.next()
+            ? new WriteSetInfo(
+                rs.getLong("WS_TXNID"), rs.getLong("WS_COMMIT_ID"),
+                rs.getString("CUR_OP"), rs.getString("COMMITTED_OP"),
+                rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"), 
rs.getString("WS_PARTITION")
+              )
+            : null);
+  }
+
+  private void handleWriteConflict(MultiDataSourceJdbcResource jdbcResource, 
TransactionContext context,
+      Object undoWriteSetForCurrentTxn, WriteSetInfo info, long txnid, Long 
commitId, boolean isReplayedReplTxn)
+      throws MetaException, TxnAbortedException {
+    //found a conflict, so let's abort the txn
+    String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId()) + "," + 
info.commitId() + "]";
+    StringBuilder resource = new 
StringBuilder(info.database()).append("/").append(info.table());
+    if (info.partition() != null) {
+      resource.append('/').append(info.partition());
+    }
+    String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + 
commitId + "]" + " due to a write conflict on " + resource +
+        " committed by " + committedTxn + " " + info.currOpType() + "/" + 
info.opType();
+    //remove WRITE_SET info for current txn since it's about to abort
+    context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
+    LOG.info(msg);
+    //TODO: should make abortTxns() write something into TXNS.TXN_META_INFO 
about this
+    int count = new AbortTxnsFunction(Collections.singletonList(txnid), false, 
false,
+        isReplayedReplTxn, 
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource);
+    if (count != 1) {
+      throw new IllegalStateException(msg + " FAILED!");
+    }
+    throw new TxnAbortedException(msg);
+  }
+
+  private void handleCompletedTxnComponents(MultiDataSourceJdbcResource 
jdbcResource, long txnid, TxnType txnType,
+      char isUpdateDelete, boolean isReplayedReplTxn, long sourceTxnId) throws 
MetaException {
     if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && 
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
       moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
+
     } else if (isReplayedReplTxn) {
       if (rqst.isSetWriteEventInfos() && !rqst.getWriteEventInfos().isEmpty()) 
{
-        jdbcResource.execute(new InsertCompletedTxnComponentsCommand(txnid, 
isUpdateDelete, rqst.getWriteEventInfos()));
+        jdbcResource.execute(
+            new InsertCompletedTxnComponentsCommand(txnid, isUpdateDelete, 
rqst.getWriteEventInfos()));
       }
-      jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId, 
rqst.getReplPolicy()));
-    }
-    updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId, 
tempCommitId);
-
-    if (rqst.isSetKeyValue()) {
-      updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
-    }
-
-    if (!isHiveReplTxn) {
-      createCommitNotificationEvent(jdbcResource, txnid , txnType, 
txnWriteDetails);
+      jdbcResource.execute(
+          new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
     }
+  }
 
-    LOG.debug("Going to commit");
+  private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource 
jdbcResource, long txnid, char isUpdateDelete) {
+    // Move the record from txn_components into completed_txn_components so 
that the compactor
+    // knows where to look to compact.
+    String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", 
\"CTC_DATABASE\", " +
+        "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", 
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
+        "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", 
:flag FROM \"TXN_COMPONENTS\" " +
+        "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
+    //we only track compactor activity in TXN_COMPONENTS to handle the case 
where the
+    //compactor txn aborts - so don't bother copying it to 
COMPLETED_TXN_COMPONENTS
+    LOG.debug("Going to execute insert <{}>", query);
+    int affectedRows = jdbcResource.getJdbcTemplate().update(query,
+        new MapSqlParameterSource()
+            .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
+            .addValue("txnid", txnid)
+            .addValue("type", OperationType.COMPACT.getSqlConst(), 
Types.CHAR));
 
-    if (MetastoreConf.getBoolVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
+    if (affectedRows < 1) {
+      //this can be reasonable for an empty txn START/COMMIT or read-only txn
+      //also an IUD with DP that didn't match any rows.
+      LOG.info("Expected to move at least one record from txn_components to "
+          + "completed_txn_components when committing txn! {}", 
JavaUtils.txnIdToString(txnid));
     }
-    return txnType;
   }
 
-  private void updateReplId(MultiDataSourceJdbcResource jdbcResource, 
ReplLastIdInfo replLastIdInfo) throws MetaException {
+  private void updateReplId(MultiDataSourceJdbcResource jdbcResource, 
ReplLastIdInfo replLastIdInfo)
+      throws MetaException {
     String lastReplId = Long.toString(replLastIdInfo.getLastReplId());
     String catalog = replLastIdInfo.isSetCatalog() ? 
normalizeIdentifier(replLastIdInfo.getCatalog()) :
         MetaStoreUtils.getDefaultCatalog(jdbcResource.getConf());
@@ -320,7 +432,7 @@ private void updateReplId(MultiDataSourceJdbcResource 
jdbcResource, ReplLastIdIn
   }
 
   private long updateDatabaseProp(MultiDataSourceJdbcResource jdbcResource, 
String catalog, String database, 
-                                  String prop, String propValue) throws 
MetaException {
+      String prop, String propValue) throws MetaException {
     String query = 
         "SELECT d.\"DB_ID\", dp.\"PARAM_KEY\", dp.\"PARAM_VALUE\" FROM 
\"DATABASE_PARAMS\" dp\n" +
             "RIGHT JOIN \"DBS\" d ON dp.\"DB_ID\" = d.\"DB_ID\" " +
@@ -334,7 +446,9 @@ private long updateDatabaseProp(MultiDataSourceJdbcResource 
jdbcResource, String
             .addValue("catalog", catalog),
         //no row means database no found
         rs -> rs.next()
-            ? new DbEntityParam(rs.getLong("DB_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE"))
+            ? new DbEntityParam(
+                rs.getLong("DB_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+              )
             : null);
 
     if (dbEntityParam == null) {
@@ -343,31 +457,31 @@ private long 
updateDatabaseProp(MultiDataSourceJdbcResource jdbcResource, String
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"DATABASE_PARAMS\" VALUES (:dbId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"DATABASE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"DB_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, database);
-      return dbEntityParam.id;      
+      return dbEntityParam.id();
     }
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for db: {}  using command {}", prop, database, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("dbId", dbEntityParam.id)
+        .addValue("dbId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("DATABASE_PARAMS is corrupted for database: " 
+ database);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
 
   private long updateTableProp(MultiDataSourceJdbcResource jdbcResource, 
String catalog, String db, long dbId,
-                                  String table, String prop, String propValue) 
throws MetaException {
+      String table, String prop, String propValue) throws MetaException {
     String query = 
         "SELECT t.\"TBL_ID\", tp.\"PARAM_KEY\", tp.\"PARAM_VALUE\" FROM 
\"TABLE_PARAMS\" tp " +
             "RIGHT JOIN \"TBLS\" t ON tp.\"TBL_ID\" = t.\"TBL_ID\" " +
@@ -381,7 +495,9 @@ private long updateTableProp(MultiDataSourceJdbcResource 
jdbcResource, String ca
             .addValue("dbId", dbId),
         //no row means table no found
         rs -> rs.next() 
-            ? new DbEntityParam(rs.getLong("TBL_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")) 
+            ? new DbEntityParam(
+                rs.getLong("TBL_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+              )
             : null);
 
     if (dbEntityParam == null) {
@@ -390,56 +506,60 @@ private long updateTableProp(MultiDataSourceJdbcResource 
jdbcResource, String ca
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, db);
-      return dbEntityParam.id;
+      return dbEntityParam.id();
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for table: {}  using command {}", prop, table, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("tblId", dbEntityParam.id)
+        .addValue("tblId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("TABLE_PARAMS is corrupted for table: " + 
table);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
   
   private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, 
long tableId,
-                                   List<String> partList, String prop, String 
propValue) {
+      List<String> partList, String prop, String propValue) {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
     //language=SQL
-    prefix.append(
-        "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM 
\"PARTITION_PARAMS\" pp\n" +
-        "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE 
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+    prefix.append("""
+        SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM 
"PARTITION_PARAMS" pp
+        RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE 
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
 
     // Populate the complete query with provided prefix and suffix
     TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries, 
prefix, suffix, partList,
         "\"PART_NAME\"", true, false);
+
     SqlParameterSource params = new MapSqlParameterSource()
         .addValue("tblId", tableId)
         .addValue("key", prop);
+
+    RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+        new DbEntityParam(
+            rs.getLong("PART_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+        );
     List<DbEntityParam> partitionParams = new ArrayList<>();
-    for(String query : queries) {
+
+    for (String query : queries) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + query + ">");
       }
-      jdbcResource.getJdbcTemplate().query(query, params,
-          (ResultSet rs) -> {
-            while (rs.next()) {
-              partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
-            }
-          });
+      partitionParams.addAll(
+          jdbcResource.getJdbcTemplate().query(query, params, mapper)
+      );
     }
 
     //TODO: would be better to replace with MERGE or UPSERT
@@ -447,93 +567,41 @@ private void 
updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, long
     //all insert in one batch
     int[][] inserts = 
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
-        partitionParams.stream().filter(p -> p.key == 
null).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() == null)
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
-          ps.setLong(1, argument.id);
-          ps.setString(2, argument.key);
+          ps.setLong(1, argument.id());
+          ps.setString(2, argument.key());
           ps.setString(3, propValue);
         });
     //all update in one batch
     int[][] updates 
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\" 
= ? AND \"PARAM_KEY\" = ?",
-        partitionParams.stream().filter(p -> p.key != null && 
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() != null && !propValue.equals(p.value()))
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
           ps.setString(1, propValue);
-          ps.setLong(2, argument.id);
-          ps.setString(3, argument.key);
+          ps.setLong(2, argument.id());
+          ps.setString(3, argument.key());
         });
 
-    if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() + 
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+    int totalChanges =
+        Arrays.stream(inserts)
+            .flatMapToInt(IntStream::of)
+            .sum()
+      + Arrays.stream(updates)
+            .flatMapToInt(IntStream::of)
+            .sum();
+
+    if (totalChanges != partList.size()) {
       throw new RuntimeException("PARTITION_PARAMS is corrupted, update 
failed");      
     }    
   }
 
-  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid) throws MetaException {
-    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
-        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
-        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
-        "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", 
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
-        "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM 
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
-        "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
-        //For partitioned table we always track writes at partition level 
(never at table)
-        //and for non partitioned - always at table level, thus the same table 
should never
-        //have entries with partition key and w/o
-        "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
-        "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " + 
//txns overlap; could replace ws_txnid
-        // with txnid, though any decent DB should infer this
-        "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has 
rows we just inserted as
-        // part of this commitTxn() op
-        "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has 
committed txns
-        //U+U and U+D and D+D is a conflict and we don't currently track I in 
WRITE_SET at all
-        //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
-        //where each does "delete X + insert X, where X is a row with the same 
PK.  This is
-        //equivalent to an update of X but won't be in conflict unless D+D is 
in conflict.
-        //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
-        //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
-        // un-serializable to allow concurrent deletes
-        "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
-        "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
-    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
-    return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
-        new MapSqlParameterSource()
-            .addValue("txnId", txnid)
-            .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
-            .addValue("opDelete", OperationType.DELETE.getSqlConst()),
-        (ResultSet rs) -> {
-          if(rs.next()) {
-            return new WriteSetInfo(rs.getLong("WS_TXNID"), 
rs.getLong("CUR_WS_COMMIT_ID"),
-                rs.getLong("WS_COMMIT_ID"), rs.getString("CUR_OP"), 
rs.getString("COMMITTED_OP"),
-                rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"), 
rs.getString("WS_PARTITION"));
-          } else {
-            return null;
-          }
-        });
-  }
-
-  private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource 
jdbcResource, long txnid, char isUpdateDelete) {
-    // Move the record from txn_components into completed_txn_components so 
that the compactor
-    // knows where to look to compact.
-    String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", 
\"CTC_DATABASE\", " +
-        "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", 
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
-        "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", 
:flag FROM \"TXN_COMPONENTS\" " +
-        "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
-    //we only track compactor activity in TXN_COMPONENTS to handle the case 
where the
-    //compactor txn aborts - so don't bother copying it to 
COMPLETED_TXN_COMPONENTS
-    LOG.debug("Going to execute insert <{}>", query);
-    int affectedRows = jdbcResource.getJdbcTemplate().update(query,
-        new MapSqlParameterSource()
-            .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
-            .addValue("txnid", txnid)
-            .addValue("type", OperationType.COMPACT.getSqlConst(), 
Types.CHAR));
-
-    if (affectedRows < 1) {
-      //this can be reasonable for an empty txn START/COMMIT or read-only txn
-      //also an IUD with DP that didn't match any rows.
-      LOG.info("Expected to move at least one record from txn_components to "
-          + "completed_txn_components when committing txn! {}", 
JavaUtils.txnIdToString(txnid));
-    }
-  }
-
   private void updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource 
jdbcResource, CommitTxnRequest rqst) {
     if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) {
       String errorMsg = "Error updating key/value in the sql backend with"
@@ -567,12 +635,13 @@ private void 
updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes
   }
 
   private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource 
jdbcResource,
-        long txnid, TxnType txnType, Long commitId, long tempId) throws 
MetaException {
+        long txnid, TxnType txnType, CommitInfo commitInfo) throws 
MetaException {
+
     List<String> queryBatch = new ArrayList<>(6);
     // update write_set with real commitId
-    if (commitId != null) {
-      queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId 
+
-          " WHERE \"WS_COMMIT_ID\" = " + tempId + " AND \"WS_TXNID\" = " + 
txnid);
+    if (!commitInfo.isEmpty()) {
+      queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + 
commitInfo.commitId() +
+          " WHERE \"WS_COMMIT_ID\" = " + commitInfo.tempId() + " AND 
\"WS_TXNID\" = " + txnid);
     }
     // clean up txn related metadata
     if (txnType != TxnType.READ_ONLY) {
@@ -581,23 +650,36 @@ private void 
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
     queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
     // DO NOT remove the transaction from the TXN table, the cleaner will 
remove it when appropriate
     queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + 
TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
+
     if (txnType == TxnType.MATER_VIEW_REBUILD) {
       queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE 
\"MRL_TXN_ID\" = " + txnid);
     }
+
     if (txnType == TxnType.SOFT_DELETE || 
MetaStoreServerUtils.isCompactionTxn(txnType)) {
-      queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + 
commitId + ", \"CQ_COMMIT_TIME\" = " +
-          getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE 
\"CQ_TXN_ID\" = " + txnid);
+      // For soft-delete and compaction transactions, we need to set the 
commit id
+      if (commitInfo.isEmpty() && !ConfVars.useMinHistoryWriteId()) {
+        // Take a global txn lock to synchronize the openTxn and commitTxn 
operations
+        new AcquireTxnLockFunction(false).execute(jdbcResource);
+      }
+      queryBatch.add("""
+          UPDATE "COMPACTION_QUEUE" SET "CQ_NEXT_TXN_ID" = %s, 
"CQ_COMMIT_TIME" = %s
+          WHERE "CQ_TXN_ID" = %d"""
+          .formatted(
+              defaultIfNull(commitInfo.commitId(), "(SELECT MAX(\"TXN_ID\") 
FROM \"TXNS\")"),
+              getEpochFn(jdbcResource.getDatabaseProduct()),
+              txnid));
     }
 
     // execute all in one batch
-    
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(queryBatch.toArray(new
 String[0]));
+    jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
+        queryBatch.toArray(new String[0]));
 
     List<Function<List<Long>, InClauseBatchCommand<Long>>> commands = List.of(
         RemoveTxnsFromMinHistoryLevelCommand::new,
         RemoveWriteIdsFromMinHistoryCommand::new
     );
     for (var cmd : commands) {
-      jdbcResource.execute(cmd.apply(ImmutableList.of(txnid)));
+      jdbcResource.execute(cmd.apply(List.of(txnid)));
     }
   }
 
@@ -609,56 +691,45 @@ private void 
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
    * @param txnWriteDetails write details of the transaction
    * @throws MetaException ex
    */
-  private void createCommitNotificationEvent(MultiDataSourceJdbcResource 
jdbcResource, long txnid, TxnType txnType, List<TxnWriteDetails> 
txnWriteDetails)
-      throws MetaException {
+  private void createCommitNotificationEvent(MultiDataSourceJdbcResource 
jdbcResource, long txnid, TxnType txnType,
+      List<TxnWriteDetails> txnWriteDetails) throws MetaException {
     if (transactionalListeners != null) {
-      notifyCommitOrAbortEvent(txnid, EventMessage.EventType.COMMIT_TXN, 
txnType, jdbcResource.getConnection(), txnWriteDetails, transactionalListeners);
+      notifyCommitOrAbortEvent(
+          txnid, EventMessage.EventType.COMMIT_TXN, txnType, 
jdbcResource.getConnection(),
+          txnWriteDetails, transactionalListeners);
 
       CompactionInfo compactionInfo = jdbcResource.execute(new 
GetCompactionInfoHandler(txnid, true));
       if (compactionInfo != null) {
-        MetaStoreListenerNotifier
-            .notifyEventWithDirectSql(transactionalListeners, 
EventMessage.EventType.COMMIT_COMPACTION,
-                new CommitCompactionEvent(txnid, compactionInfo), 
jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
+        MetaStoreListenerNotifier.notifyEventWithDirectSql(
+            transactionalListeners, EventMessage.EventType.COMMIT_COMPACTION, 
new CommitCompactionEvent(txnid, compactionInfo),
+            jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
       } else {
         LOG.warn("No compaction queue record found for Compaction type 
transaction commit. txnId:" + txnid);
       }
-      
     }
   }
 
-  private static class DbEntityParam {
-    final long id;
-    final String key;
-    final String value;
-
-    public DbEntityParam(long id, String key, String value) {
-      this.id = id;
-      this.key = key;
-      this.value = value;
-    }
+  private record DbEntityParam(long id, String key, String value) {
   }
   
-  private static class WriteSetInfo {
-    final long txnId;
-    final long currentCommitId;
-    final long committedCommitId;
-    final String currentOperationType;
-    final String committedOperationType;
-    final String database;
-    final String table;
-    final String partition;
-
-    public WriteSetInfo(long txnId, long currentCommitId, long 
committedCommitId, 
-                        String currentOperationType, String 
committedOperationType, 
-                        String database, String table, String partition) {
-      this.txnId = txnId;
-      this.currentCommitId = currentCommitId;
-      this.committedCommitId = committedCommitId;
-      this.currentOperationType = currentOperationType;
-      this.committedOperationType = committedOperationType;
-      this.database = database;
-      this.table = table;
-      this.partition = partition;
+  private record WriteSetInfo(
+      long txnId, long commitId, String currOpType, String opType,
+      String database, String table, String partition) {
+  }
+
+  private record CommitInfo(
+      long tempId, Long commitId, boolean isUpdateOrDelete) {
+
+    private static CommitInfo empty() {
+      return new CommitInfo(0L, null, false);
+    }
+
+    public boolean isEmpty() {
+      return commitId == null;
+    }
+
+    public char opFlag() {
+      return isUpdateOrDelete ? 'Y' : 'N';
     }
   }
 

Reply via email to