deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1293099638
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -58,114 +68,18 @@
* methods are not available through the thrift interface.
*/
class CompactionTxnHandler extends TxnHandler {
- private static final String CLASS_NAME =
CompactionTxnHandler.class.getName();
- private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- private static final String DB_FAILED_TO_CONNECT = "Unable to connect to
transaction database: ";
-
- private static DataSource connPoolCompaction;
-
- private static final String SELECT_COMPACTION_QUEUE_BY_TXN_ID =
- "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
- + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\",
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
- + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\",
\"CQ_ERROR_MESSAGE\", "
- + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\", "
- + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\",
\"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", "
- + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_TXN_ID\" = ?";
- private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
- "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM
\"COMPACTION_METRICS_CACHE\" " +
- "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND
\"CMC_METRIC_TYPE\" = ?";
- private static final String
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
- "\"CMC_DATABASE\", \"CMC_TABLE\", \"CMC_PARTITION\",
\"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM " +
- "\"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER BY
\"CMC_METRIC_VALUE\" DESC";
- private static final String UPDATE_COMPACTION_METRICS_CACHE_QUERY =
- "UPDATE \"COMPACTION_METRICS_CACHE\" SET \"CMC_METRIC_VALUE\" = ?,
\"CMC_VERSION\" = ? " +
- "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND
\"CMC_METRIC_TYPE\" = ? " +
- "AND \"CMC_VERSION\" = ?";
- private static final String INSERT_COMPACTION_METRICS_CACHE_QUERY =
- "INSERT INTO \"COMPACTION_METRICS_CACHE\" ( " +
- "\"CMC_DATABASE\", \"CMC_TABLE\", \"CMC_PARTITION\",
\"CMC_METRIC_TYPE\", \"CMC_METRIC_VALUE\", " +
- "\"CMC_VERSION\" ) VALUES (?, ?, ?, ?, ?, ?)";
- private static final String DELETE_COMPACTION_METRICS_CACHE_QUERY =
- "DELETE FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_DATABASE\" = ? AND
\"CMC_TABLE\" = ? " +
- "AND \"CMC_METRIC_TYPE\" = ?";
-
- private static final String DELETE_FAILED_TXNS_SQL =
- "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM
\"TXN_COMPONENTS\") " +
- "AND (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = "
+ TxnStatus.COMMITTED + ") " +
- "AND \"TXN_ID\" < ?";
-
- // Three inner sub-queries which are under left-join to fetch the required
data for aborted txns.
- private static final String SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY =
- " \"res1\".\"TC_DATABASE\" AS \"DB\", \"res1\".\"TC_TABLE\" AS
\"TBL\", \"res1\".\"TC_PARTITION\" AS \"PART\", " +
- " \"res1\".\"MIN_TXN_START_TIME\" AS \"MIN_TXN_START_TIME\",
\"res1\".\"ABORTED_TXN_COUNT\" AS \"ABORTED_TXN_COUNT\", " +
- " \"res2\".\"MIN_OPEN_WRITE_TXNID\" AS \"MIN_OPEN_WRITE_TXNID\",
\"res3\".\"RETRY_RETENTION\" AS \"RETRY_RETENTION\", " +
- " \"res3\".\"ID\" AS \"RETRY_CQ_ID\" " +
- " FROM " +
- // First sub-query - Gets the aborted txns with min txn start time,
number of aborted txns
- // for corresponding db, table, partition.
- " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
- " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\",
\"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " +
TxnStatus.ABORTED +
- " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s )
\"res1\" " +
- " LEFT JOIN" +
- // Second sub-query - Gets the min open txn id for corresponding db,
table, partition.
- "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " +
- " FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " +
TxnStatus.OPEN +
- " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" )
\"res2\"" +
- " ON \"res1\".\"TC_DATABASE\" = \"res2\".\"TC_DATABASE\"" +
- " AND \"res1\".\"TC_TABLE\" = \"res2\".\"TC_TABLE\"" +
- " AND (\"res1\".\"TC_PARTITION\" = \"res2\".\"TC_PARTITION\" " +
- " OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res2\".\"TC_PARTITION\" IS NULL)) " +
- " LEFT JOIN " +
- // Third sub-query - Gets the retry entries for corresponding db,
table, partition.
- "( SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\",
MAX(\"CQ_ID\") AS \"ID\", " +
- " MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RETENTION\", " +
- " MIN(\"CQ_COMMIT_TIME\") - %s + MAX(\"CQ_RETRY_RETENTION\") AS
\"RETRY_RECORD_CHECK\" FROM \"COMPACTION_QUEUE\" " +
- " WHERE \"CQ_TYPE\" = " + quoteChar(TxnStore.ABORT_TXN_CLEANUP_TYPE)
+
- " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"res3\"
" +
- " ON \"res1\".\"TC_DATABASE\" = \"res3\".\"CQ_DATABASE\" " +
- " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
- " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
- " OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res3\".\"CQ_PARTITION\" IS NULL))" +
- " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
-
- private static final String DELETE_CQ_ENTRIES = "DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
-
- private static final String DELETE_CQ_AND_ABORT_ENTRIES = DELETE_CQ_ENTRIES +
- " OR ( \"CQ_DATABASE\" = ? AND \"CQ_TABLE\" = ? " +
- " AND \"CQ_PARTITION\" %s AND \"CQ_TYPE\" = " +
- quoteChar(TxnStore.ABORT_TXN_CLEANUP_TYPE) + " )";
-
- private static final String INSERT_ABORT_RETRY_ENTRY_INTO_COMPACTION_QUEUE =
- "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\",
\"CQ_TABLE\", \"CQ_PARTITION\", " +
- " \"CQ_TYPE\", \"CQ_STATE\", \"CQ_RETRY_RETENTION\",
\"CQ_ERROR_MESSAGE\", \"CQ_COMMIT_TIME\") " +
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s)";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionTxnHandler.class.getName());
public CompactionTxnHandler() {
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- synchronized (CompactionTxnHandler.class) {
- if (connPoolCompaction == null) {
- int maxPoolSize = MetastoreConf.getIntVar(conf,
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS);
Review Comment:
where do you set the connection pool for compaction activities?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]