deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1309004327
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -678,316 +215,38 @@ private void setCleanerStart(Connection dbConn,
CompactionInfo info, Long timest
@RetrySemantics.CannotRetry
public void markCleaned(CompactionInfo info) throws MetaException {
LOG.debug("Running markCleaned with CompactionInfo: {}", info);
- try {
- Connection dbConn = null;
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- if (!info.isAbortedTxnCleanup()) {
- String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\",
\"CC_DATABASE\", "
- + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\",
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
- + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
- + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\",
\"CC_ENQUEUE_TIME\", "
- + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\", "
- + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\",
\"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
- + "\"CC_ORDER_BY\") "
- + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\",
\"CQ_PARTITION\", "
- + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\",
\"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
- + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\",
\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
- + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\",
\"CQ_ENQUEUE_TIME\", "
- + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\", "
- + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\",
\"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
- + "\"CQ_ORDER_BY\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
- pStmt = dbConn.prepareStatement(s);
- pStmt.setLong(1, info.id);
- LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
- pStmt.executeUpdate();
- }
-
- /* Remove compaction queue record corresponding to the compaction
which has been successful as well as
- * remove all abort retry associated metadata of table/partition in
the COMPACTION_QUEUE both when compaction
- * or abort cleanup is successful. We don't want a situation wherein
we have an abort retry entry for a table
- * but no corresponding entry in TXN_COMPONENTS table. Successful
compaction will delete
- * the retry metadata, so that abort cleanup is retried again (an
optimistic retry approach).
- */
- removeCompactionAndAbortRetryEntries(dbConn, info);
-
- if (!info.isAbortedTxnCleanup()) {
- // Remove entries from completed_txn_components as well, so we don't
start looking there
- // again but only up to the highest write ID include in this
compaction job.
- //highestWriteId will be NULL in upgrade scenarios
- String query =
- "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" " +
- " WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
- if (info.partName != null) {
- query += " AND \"CTC_PARTITION\" = ?";
- }
- if (info.highestWriteId != 0) {
- query += " AND \"CTC_WRITEID\" <= ?";
- }
- pStmt = dbConn.prepareStatement(query);
- int paramCount = 1;
- pStmt.setString(paramCount++, info.dbname);
- pStmt.setString(paramCount++, info.tableName);
- if (info.partName != null) {
- pStmt.setString(paramCount++, info.partName);
- }
- if (info.highestWriteId != 0) {
- pStmt.setLong(paramCount, info.highestWriteId);
- }
- LOG.debug("Going to execute update <{}>", query);
- int updCount = pStmt.executeUpdate();
- if (updCount < 1) {
- LOG.warn("Expected to remove at least one row from
completed_txn_components when " +
- "marking compaction entry as clean!");
- }
- LOG.debug("Removed {} records from completed_txn_components",
updCount);
- }
-
- // Do cleanup of metadata in TXN_COMPONENTS table.
- removeTxnComponents(dbConn, info);
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "markCleaned(" + info + ")");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs, pStmt, dbConn);
- }
- } catch (RetryException e) {
- markCleaned(info);
- }
- }
-
- private void removeTxnComponents(Connection dbConn, CompactionInfo info)
throws MetaException, RetryException {
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- /*
- * compaction may remove data from aborted txns above tc_writeid bit it
only guarantees to
- * remove it up to (inclusive) tc_writeid, so it's critical to not
remove metadata about
- * aborted TXN_COMPONENTS above tc_writeid (and consequently about
aborted txns).
- * See {@link ql.txn.compactor.Cleaner.removeFiles()}
- */
- String s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
- + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " +
TxnStatus.ABORTED + ") "
- + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
- + "AND \"TC_PARTITION\" " + TxnUtils.nvl(info.partName);
-
- List<String> queries = new ArrayList<>();
- Iterator<Long> writeIdsIter = null;
- List<Integer> counts = null;
-
- if (info.writeIds != null && !info.writeIds.isEmpty()) {
- StringBuilder prefix = new StringBuilder(s).append(" AND ");
- List<String> questions = Collections.nCopies(info.writeIds.size(),
"?");
-
- counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
- new StringBuilder(), questions, "\"TC_WRITEID\"", false,
false);
- writeIdsIter = info.writeIds.iterator();
- } else if (!info.hasUncompactedAborts) {
- if (info.highestWriteId != 0) {
- s += " AND \"TC_WRITEID\" <= ?";
- }
- queries.add(s);
- }
-
- for (int i = 0; i < queries.size(); i++) {
- String query = queries.get(i);
- int writeIdCount = (counts != null) ? counts.get(i) : 0;
-
- LOG.debug("Going to execute update <{}>", query);
- pStmt = dbConn.prepareStatement(query);
- int paramCount = 1;
-
- pStmt.setString(paramCount++, info.dbname);
- pStmt.setString(paramCount++, info.tableName);
- if (info.partName != null) {
- pStmt.setString(paramCount++, info.partName);
- }
- if (info.highestWriteId != 0 && writeIdCount == 0) {
- pStmt.setLong(paramCount, info.highestWriteId);
- }
- for (int j = 0; j < writeIdCount; j++) {
- if (writeIdsIter.hasNext()) {
- pStmt.setLong(paramCount + j, writeIdsIter.next());
- }
- }
- int rc = pStmt.executeUpdate();
- LOG.debug("Removed {} records from txn_components", rc);
- }
- } catch (SQLException e) {
- LOG.error("Unable to delete from txn components due to {}",
e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "markCleanedForAborts(" + info + ")");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs);
- closeStmt(pStmt);
- }
+ new MarkCleanedFunction(info, dbProduct, conf).execute(jdbcTemplate);
Review Comment:
why do you need to pass the product, doesn't the wrapper already have it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]