veghlaci05 commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1309917798


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -289,109 +125,12 @@ public CompactionInfo findNextToCompact(String workerId) 
throws MetaException {
    */
   @Override
   @RetrySemantics.SafeToRetry
-  @SuppressWarnings("squid:S2095")
   public CompactionInfo findNextToCompact(FindNextCompactRequest rqst) throws 
MetaException {
-    try {
-      if (rqst == null) {
-        throw new MetaException("FindNextCompactRequest is null");
-      }
-
-      Connection dbConn = null;
-      PreparedStatement stmt = null;
-      //need a separate stmt for executeUpdate() otherwise it will close the 
ResultSet(HIVE-12725)
-      Statement updStmt = null;
-      ResultSet rs = null;
-
-      long poolTimeout = MetastoreConf.getTimeVar(conf, 
ConfVars.COMPACTOR_WORKER_POOL_TIMEOUT, TimeUnit.MILLISECONDS);
-
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        StringBuilder sb = new StringBuilder();
-        sb.append("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", " +
-          "\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", 
\"CQ_ORDER_BY\", " +
-          "\"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = 
" + quoteChar(INITIATED_STATE) + " AND ");
-        boolean hasPoolName = StringUtils.isNotBlank(rqst.getPoolName());
-        if(hasPoolName) {
-          sb.append("\"CQ_POOL_NAME\"=?");
-        } else {
-          sb.append("\"CQ_POOL_NAME\" IS NULL OR  \"CQ_ENQUEUE_TIME\" < (")
-            .append(getEpochFn(dbProduct)).append(" - 
").append(poolTimeout).append(")");
-        }
-        sb.append(" ORDER BY \"CQ_ID\" ASC");
-        String query = sb.toString();
-        stmt = dbConn.prepareStatement(query);
-        if (hasPoolName) {
-          stmt.setString(1, rqst.getPoolName());
-        }
-
-        LOG.debug("Going to execute query <{}>", query);
-        rs = stmt.executeQuery();
-        if (!rs.next()) {
-          LOG.debug("No compactions found ready to compact");
-          dbConn.rollback();
-          return null;
-        }
-        updStmt = dbConn.createStatement();
-        do {
-          CompactionInfo info = new CompactionInfo();
-          info.id = rs.getLong(1);
-          info.dbname = rs.getString(2);
-          info.tableName = rs.getString(3);
-          info.partName = rs.getString(4);
-          info.type = 
TxnUtils.dbCompactionType2ThriftType(rs.getString(5).charAt(0));
-          info.poolName = rs.getString(6);
-          info.numberOfBuckets = rs.getInt(7);
-          info.orderByClause = rs.getString(8);
-          info.properties = rs.getString(9);
-          info.workerId = rqst.getWorkerId();
-
-          String workerId = rqst.getWorkerId();
-          String workerVersion = rqst.getWorkerVersion();
-          String workerIdSqlValue = (workerId == null) ? "NULL" : ("'" + 
workerId + "'");
-
-          // Now, update this record as being worked on by this worker.
-          long now = getDbTime(dbConn);
-          query = "" +
-              "UPDATE " +
-              "  \"COMPACTION_QUEUE\" " +
-              "SET " +
-              " \"CQ_WORKER_ID\" = " + workerIdSqlValue + ", " +
-              " \"CQ_WORKER_VERSION\" = '" + workerVersion + "', " +
-              " \"CQ_START\" = " + now + ", " +
-              " \"CQ_STATE\" = '" + WORKING_STATE + "' " +
-              "WHERE \"CQ_ID\" = " + info.id +
-              "  AND \"CQ_STATE\"='" + INITIATED_STATE + "'";
-
-          LOG.debug("Going to execute update <{}>", query);
-          int updCount = updStmt.executeUpdate(query);
-          if(updCount == 1) {
-            dbConn.commit();
-            return info;
-          }
-          if(updCount == 0) {
-            LOG.debug("Worker {} (version: {}) picked up {}", workerId, 
workerVersion, info);
-            continue;
-          }
-          LOG.error("Unable to set to cq_state={} for compaction record: {}. 
updCnt={}. workerId={}. workerVersion={}",
-              WORKING_STATE, info, updCount, workerId, workerVersion);
-          dbConn.rollback();
-          return null;
-        } while( rs.next());
-        dbConn.rollback();
-        return null;
-      } catch (SQLException e) {
-        LOG.error("Unable to select next element for compaction, " + 
e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "findNextToCompact(rqst:" + rqst + ")");
-        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-      } finally {
-        closeStmt(updStmt);
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return findNextToCompact(rqst);
+    if (rqst == null) {
+      throw new MetaException("FindNextCompactRequest is null");
     }
+    long poolTimeout = MetastoreConf.getTimeVar(conf, 
ConfVars.COMPACTOR_WORKER_POOL_TIMEOUT, TimeUnit.MILLISECONDS);

Review Comment:
   This timeout is specific to this query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to