deniskuzZ commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1419036479


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -648,260 +431,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) 
throws MetaException {
       throw new MetaException("Invalid input for number of txns: " + numTxns);
     }
 
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        /*
-         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
-         * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
-         * One way to do it, would be to serialize the openTxns call with a 
S4U lock, but that would cause
-         * performance degradation with high transaction load.
-         * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
-         * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
-         * the period.
-         * Example: At t[0] there is one open transaction in the TXNS table, 
T[1].
-         * T[2] acquires the next sequence at t[1] but only commits into the 
TXNS table at t[10].
-         * T[3] acquires its sequence at t[2], and commits into the TXNS table 
at t[3].
-         * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also 
T[2] in the snapshot’s
-         * open transaction list. T[1] because it is presented as open in TXNS,
-         * T[2] because it is a missing sequence.
-         *
-         * In the current design, there can be several metastore instances 
running in a given Warehouse.
-         * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
-         * a client may go to MS1 and start a transaction with ID 500 to 
update a particular row.
-         * Now the same client will start another transaction, except it ends 
up on MS2 and may get
-         * transaction ID 400 and update the same row.  Now the merge that 
happens to materialize the snapshot
-         * on read will thing the version of the row from transaction ID 500 
is the latest one.
-         *
-         * Longer term we can consider running Active-Passive MS (at least wrt 
to ACID operations).  This
-         * set could support a write-through cache for added performance.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        /*
-         * The openTxn and commitTxn must be mutexed, when committing a not 
read only transaction.
-         * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
-         * Since table locks are working in Derby, we don't need the 
lockInternal call here.
-         * Example: Suppose we have two transactions with update like x = x+1.
-         * We have T[3,3] that was using a value from a snapshot with T[2,2]. 
If we allow committing T[3,3]
-         * and opening T[4] parallel it is possible, that T[4] will be using 
the value from a snapshot with T[2,2],
-         * and we will have a lost update problem
-         */
-        acquireTxnLock(stmt, true);
-        // Measure the time from acquiring the sequence value, till committing 
in the TXNS table
-        StopWatch generateTransactionWatch = new StopWatch();
-        generateTransactionWatch.start();
-
-        List<Long> txnIds = openTxns(dbConn, rqst);
+    /*
+     * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
+     * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
+     * One way to do it, would be to serialize the openTxns call with a S4U 
lock, but that would cause
+     * performance degradation with high transaction load.
+     * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
+     * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
+     * the period.
+     * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+     * T[2] acquires the next sequence at t[1] but only commits into the TXNS 
table at t[10].
+     * T[3] acquires its sequence at t[2], and commits into the TXNS table at 
t[3].
+     * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] 
in the snapshot’s
+     * open transaction list. T[1] because it is presented as open in TXNS,
+     * T[2] because it is a missing sequence.
+     *
+     * In the current design, there can be several metastore instances running 
in a given Warehouse.
+     * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
+     * a client may go to MS1 and start a transaction with ID 500 to update a 
particular row.
+     * Now the same client will start another transaction, except it ends up 
on MS2 and may get
+     * transaction ID 400 and update the same row.  Now the merge that happens 
to materialize the snapshot
+     * on read will thing the version of the row from transaction ID 500 is 
the latest one.
+     *
+     * Longer term we can consider running Active-Passive MS (at least wrt to 
ACID operations).  This
+     * set could support a write-through cache for added performance.
+     */
+    /*
+     * The openTxn and commitTxn must be mutexed, when committing a not read 
only transaction.
+     * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
+     * Since table locks are working in Derby, we don't need the lockInternal 
call here.
+     * Example: Suppose we have two transactions with update like x = x+1.
+     * We have T[3,3] that was using a value from a snapshot with T[2,2]. If 
we allow committing T[3,3]
+     * and opening T[4] parallel it is possible, that T[4] will be using the 
value from a snapshot with T[2,2],
+     * and we will have a lost update problem
+     */
+    acquireTxnLock(true);
+    // Measure the time from acquiring the sequence value, till committing in 
the TXNS table
+    StopWatch generateTransactionWatch = new StopWatch();
+    generateTransactionWatch.start();
 
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        generateTransactionWatch.stop();
-        long elapsedMillis = 
generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
-        TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : 
TxnType.DEFAULT;
-        if (txnType != TxnType.READ_ONLY && elapsedMillis >= 
openTxnTimeOutMillis) {
-          /*
-           * The commit was too slow, we can not allow this to continue 
(except if it is read only,
-           * since that can not cause dirty reads).
-           * When calculating the snapshot for a given transaction, we look 
back for possible open transactions
-           * (that are not yet committed in the TXNS table), for 
TXN_OPENTXN_TIMEOUT period.
-           * We can not allow a write transaction, that was slower than 
TXN_OPENTXN_TIMEOUT to continue,
-           * because there can be other transactions running, that didn't 
considered this transactionId open,
-           * this could cause dirty reads.
-           */
-          LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting 
transactionIds: {}", elapsedMillis, txnIds);
-          deleteInvalidOpenTransactions(dbConn, txnIds);
-          dbConn.commit();
-          /*
-           * We do not throw RetryException directly, to not circumvent the 
max retry limit
-           */
-          throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY);
-        }
-        return new OpenTxnsResponse(txnIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback: ", e);
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " 
+ StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return openTxns(rqst);
-    }
-  }
+    List<Long> txnIds = new OpenTxnsFunction(rqst, openTxnTimeOutMillis, 
transactionalListeners).execute(jdbcResource);
 
-  private List<Long> openTxns(Connection dbConn, OpenTxnRequest rqst)
-          throws SQLException, MetaException {
-    int numTxns = rqst.getNum_txns();
-    // Make sure the user has not requested an insane amount of txns.
-    int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
-    if (numTxns > maxTxns) {
-      numTxns = maxTxns;
-    }
-    List<PreparedStatement> insertPreparedStmts = null;
+    LOG.debug("Going to commit");
+    
jdbcResource.getTransactionManager().getActiveTransaction().createSavepoint();

Review Comment:
   why do we need this, it's not present in original code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to