deniskuzZ commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1419090624


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -1740,4039 +649,413 @@ public long getLatestTxnIdInConflict(long txnid) 
throws MetaException {
    * @param txnId
    * @throws MetaException
    */
-    private List<String> getTxnDbsUpdated(long txnId, Connection dbConn) 
throws MetaException {
+  private List<String> getTxnDbsUpdated(long txnId) throws MetaException {
     try {
-      try (Statement stmt = dbConn.createStatement()) {
-
-        String query = "SELECT DISTINCT \"T2W_DATABASE\" " +
-                " FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\"" +
-                "   WHERE \"T2W_TXNID\" = " + txnId;
-
-        LOG.debug("Going to execute query: <{}>", query);
-        try (ResultSet rs = stmt.executeQuery(query)) {
-          List<String> dbsUpdated = new ArrayList<String>();
-          while (rs.next()) {
-            dbsUpdated.add(rs.getString(1));
-          }
-          return dbsUpdated;
-        }
-      } catch (SQLException e) {
-        checkRetryable(e, "getTxnDbsUpdated");
-        throw new MetaException(StringUtils.stringifyException(e));
-      }
-    } catch (RetryException e) {
-      return getTxnDbsUpdated(txnId, dbConn);
-    }
-  }
-
-
-  private ResultSet checkForWriteConflict(Statement stmt, long txnid) throws 
SQLException, MetaException {
-    String writeConflictQuery = sqlGenerator.addLimitClause(1, 
"\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
-            "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
-            "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", 
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
-            "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM 
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
-            "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
-            //For partitioned table we always track writes at partition level 
(never at table)
-            //and for non partitioned - always at table level, thus the same 
table should never
-            //have entries with partition key and w/o
-            "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
-            "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + 
//txns overlap; could replace ws_txnid
-            // with txnid, though any decent DB should infer this
-            " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join 
only has rows we just inserted as
-            // part of this commitTxn() op
-            " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has 
committed txns
-            //U+U and U+D and D+D is a conflict and we don't currently track I 
in WRITE_SET at all
-            //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
-            //where each does "delete X + insert X, where X is a row with the 
same PK.  This is
-            //equivalent to an update of X but won't be in conflict unless D+D 
is in conflict.
-            //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
-            //multi-insert stmt (an Insert and a Delete branch).  It also 
'feels'
-            // un-serializable to allow concurrent deletes
-            " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + 
OperationType.UPDATE +
-            ", " + OperationType.DELETE +
-            ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + OperationType.UPDATE+ 
", "
-            + OperationType.DELETE + "))");
-    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
-    return stmt.executeQuery(writeConflictQuery);
-  }
-
-  private void moveTxnComponentsToCompleted(Statement stmt, long txnid, char 
isUpdateDelete) throws SQLException {
-    // Move the record from txn_components into completed_txn_components so 
that the compactor
-    // knows where to look to compact.
-    String s = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", 
\"CTC_DATABASE\", " +
-            "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", 
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\"," +
-        " \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", '" 
+ isUpdateDelete +
-        "' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid +
-        //we only track compactor activity in TXN_COMPONENTS to handle the 
case where the
-        //compactor txn aborts - so don't bother copying it to 
COMPLETED_TXN_COMPONENTS
-        " AND \"TC_OPERATION_TYPE\" <> " + OperationType.COMPACT;
-    LOG.debug("Going to execute insert <{}>", s);
-
-    if ((stmt.executeUpdate(s)) < 1) {
-      //this can be reasonable for an empty txn START/COMMIT or read-only txn
-      //also an IUD with DP that didn't match any rows.
-      LOG.info("Expected to move at least one record from txn_components to "
-          + "completed_txn_components when committing txn! {}", 
JavaUtils.txnIdToString(txnid));
-    }
-  }
-
-  /**
-   * See overridden method in CompactionTxnHandler also.
-   */
-  protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long 
txnid, TxnType txnType,
-      Long commitId, long tempId) throws SQLException, MetaException {
-    List<String> queryBatch = new ArrayList<>(5);
-    // update write_set with real commitId
-    if (commitId != null) {
-      queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId 
+
-              " WHERE \"WS_COMMIT_ID\" = " + tempId + " AND \"WS_TXNID\" = " + 
txnid);
-    }
-    // clean up txn related metadata
-    if (txnType != TxnType.READ_ONLY) {
-      queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + 
txnid);
-    }
-    queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
-    // DO NOT remove the transaction from the TXN table, the cleaner will 
remove it when appropriate
-    queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + 
TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
-    if (txnType == TxnType.MATER_VIEW_REBUILD) {
-      queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE 
\"MRL_TXN_ID\" = " + txnid);
-    }
-    // execute all in one batch
-    executeQueriesInBatchNoCount(dbProduct, stmt, queryBatch, maxBatchSize);
-  }
-
-  private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, 
Statement stmt) throws SQLException {
-    if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) {
-      String errorMsg = "Error updating key/value in the sql backend with"
-          + " txnId=" + rqst.getTxnid() + ","
-          + " tableId=" + rqst.getKeyValue().getTableId() + ","
-          + " key=" + rqst.getKeyValue().getKey() + ","
-          + " value=" + rqst.getKeyValue().getValue() + "."
-          + " key should start with " + TXN_KEY_START + ".";
-      LOG.warn(errorMsg);
-      throw new IllegalArgumentException(errorMsg);
-    }
-    String s = "UPDATE \"TABLE_PARAMS\" SET"
-        + " \"PARAM_VALUE\" = " + quoteString(rqst.getKeyValue().getValue())
-        + " WHERE \"TBL_ID\" = " + rqst.getKeyValue().getTableId()
-        + " AND \"PARAM_KEY\" = " + quoteString(rqst.getKeyValue().getKey());
-    LOG.debug("Going to execute update <{}>", s);
-    int affectedRows = stmt.executeUpdate(s);
-    if (affectedRows != 1) {
-      String errorMsg = "Error updating key/value in the sql backend with"
-          + " txnId=" + rqst.getTxnid() + ","
-          + " tableId=" + rqst.getKeyValue().getTableId() + ","
-          + " key=" + rqst.getKeyValue().getKey() + ","
-          + " value=" + rqst.getKeyValue().getValue() + "."
-          + " Only one row should have been affected but "
-          + affectedRows + " rows where affected.";
-      LOG.warn(errorMsg);
-      throw new IllegalStateException(errorMsg);
+      return sqlRetryHandler.executeWithRetry(
+          new SqlRetryCallProperties().withCallerId("GetTxnDbsUpdatedHandler"),
+          () -> jdbcResource.execute(new GetTxnDbsUpdatedHandler(txnId)));
+    } catch (MetaException e) {
+      throw e;
+    } catch (TException e) {
+      throw new MetaException(e.getMessage());
     }
   }
 
   /**
-   * Replicate Table Write Ids state to mark aborted write ids and writeid 
high water mark.
+   * Replicate Table Write Ids state to mark aborted write ids and writeid 
high watermark.
    * @param rqst info on table/partitions and writeid snapshot to replicate.
    * @throws MetaException
    */
   @Override
   @RetrySemantics.Idempotent("No-op if already replicated the writeid state")
   public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws 
MetaException {
-    String dbName = rqst.getDbName().toLowerCase();
-    String tblName = rqst.getTableName().toLowerCase();
-    ValidWriteIdList validWriteIdList = new 
ValidReaderWriteIdList(rqst.getValidWriteIdlist());
-
-    // Get the abortedWriteIds which are already sorted in ascending order.
-    List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList);
-    int numAbortedWrites = abortedWriteIds.size();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      PreparedStatement pStmt = null;
-      List<PreparedStatement> insertPreparedStmts = null;
-      ResultSet rs = null;
-      List<String> params = Arrays.asList(dbName, tblName);
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        // Check if this txn state is already replicated for this given table. 
If yes, then it is
-        // idempotent case and just return.
-        String sql = "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE 
\"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?";
-        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Going to execute query <" + sql.replace("?", "{}") + ">",
-              quoteString(dbName), quoteString(tblName));
-        }
-        rs = pStmt.executeQuery();
-        if (rs.next()) {
-          LOG.info("Idempotent flow: WriteId state <{}> is already applied for 
the table: {}.{}", validWriteIdList,
-              dbName, tblName);
-          rollbackDBConn(dbConn);
-          return;
-        }
-
-        if (numAbortedWrites > 0) {
-          // Allocate/Map one txn per aborted writeId and abort the txn to 
mark writeid as aborted.
-          // We don't use the txnLock, all of these transactions will be 
aborted in this one rdbm transaction
-          // So they will not effect the commitTxn in any way
-          List<Long> txnIds = openTxns(dbConn,
-                  new OpenTxnRequest(numAbortedWrites, rqst.getUser(), 
rqst.getHostName()));
-          assert(numAbortedWrites == txnIds.size());
-
-          // Map each aborted write id with each allocated txn.
-          List<String> rows = new ArrayList<>();
-          List<List<String>> paramsList = new ArrayList<>();
-          int i = 0;
-          for (long txn : txnIds) {
-            long writeId = abortedWriteIds.get(i++);
-            rows.add(txn + ", ?, ?, " + writeId);
-            paramsList.add(params);
-            LOG.info("Allocated writeID: {} for txnId: {}", writeId, txn);
-          }
+    new ReplTableWriteIdStateFunction(rqst, mutexAPI, 
transactionalListeners).execute(jdbcResource);
+  }
 
-          // Insert entries to TXN_TO_WRITE_ID for aborted write ids
-          insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-                  "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", 
\"T2W_TABLE\", \"T2W_WRITEID\")", rows,
-                  paramsList);
-          for (PreparedStatement pst : insertPreparedStmts) {
-            pst.execute();
-          }
+  @Override
+  @RetrySemantics.ReadOnly
+  public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest 
rqst) throws MetaException {
+    return new GetValidWriteIdsFunction(rqst, 
openTxnTimeOutMillis).execute(jdbcResource);
+  }
+  
+  @Override
+  @RetrySemantics.Idempotent
+  public AllocateTableWriteIdsResponse 
allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) throws MetaException {
+    return new AllocateTableWriteIdsFunction(rqst, 
transactionalListeners).execute(jdbcResource);
+  }
 
-          // Abort all the allocated txns so that the mapped write ids are 
referred as aborted ones.
-          int numAborts = abortTxns(dbConn, txnIds, false, false, 
TxnErrorMsg.ABORT_REPL_WRITEID_TXN);
-          assert(numAborts == numAbortedWrites);
-        }
+  @Override
+  public MaxAllocatedTableWriteIdResponse 
getMaxAllocatedTableWrited(MaxAllocatedTableWriteIdRequest rqst) throws 
MetaException {
+    return jdbcResource.execute(new GetMaxAllocatedTableWriteIdHandler(rqst));
+  }
 
-        // There are some txns in the list which has no write id allocated and 
hence go ahead and do it.
-        // Get the next write id for the given table and update it with new 
next write id.
-        // It is expected NEXT_WRITE_ID doesn't have entry for this table and 
hence directly insert it.
-        long nextWriteId = validWriteIdList.getHighWatermark() + 1;
+  @Override
+  public void seedWriteId(SeedTableWriteIdsRequest rqst) throws MetaException {
+    //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should 
not have an entry
+    //for this table.  It also has a unique index in case 'should not' is 
violated
 
-        // First allocation of write id (hwm+1) should add the table to the 
next_write_id meta table.
-        sql = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", 
\"NWI_NEXT\") VALUES (?, ?, "
-                + Long.toString(nextWriteId) + ")";
-        closeStmt(pStmt);
-        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Going to execute insert <" + sql.replace("?", "{}") + ">",
-              quoteString(dbName), quoteString(tblName));
-        }
-        pStmt.execute();
+    // First allocation of write id should add the table to the next_write_id 
meta table
+    // The initial value for write id should be 1 and hence we add 1 with 
number of write ids
+    // allocated here
+    jdbcResource.getJdbcTemplate().update(
+        "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", 
\"NWI_NEXT\") VALUES (:db, :table, :writeId)",
+        new MapSqlParameterSource()
+            .addValue("db", rqst.getDbName())
+            .addValue("table", rqst.getTableName())
+            .addValue("writeId", rqst.getSeedWriteId() + 1));
+  }
 
-        LOG.info("WriteId state <{}> is applied for the table: {}.{}", 
validWriteIdList, dbName, tblName);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback: ", e);
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "replTableWriteIdState(" + rqst + ")", true);
-        throw new MetaException("Unable to update transaction database "
-                + StringUtils.stringifyException(e));
-      } finally {
-        if (insertPreparedStmts != null) {
-          for (PreparedStatement pst : insertPreparedStmts) {
-            closeStmt(pst);
-          }
-        }
-        closeStmt(pStmt);
-        close(rs, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      replTableWriteIdState(rqst);
+  @Override
+  public void seedTxnId(SeedTxnIdRequest rqst) throws MetaException {
+    /*
+     * Locking the txnLock an exclusive way, we do not want to set the txnId 
backward accidentally
+     * if there are concurrent open transactions
+     */
+    acquireTxnLock(false);
+    long highWaterMark = jdbcResource.execute(new GetHighWaterMarkHandler());
+    if (highWaterMark >= rqst.getSeedTxnId()) {
+      throw new MetaException(MessageFormat
+          .format("Invalid txnId seed {}, the highWaterMark is {}", 
rqst.getSeedTxnId(), highWaterMark));
     }
+    jdbcResource.getJdbcTemplate().getJdbcTemplate().execute((Statement stmt) 
-> stmt.execute(dbProduct.getTxnSeedFn(rqst.getSeedTxnId())));

Review Comment:
   too long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to