zabetak commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1482918569
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -648,6 +646,52 @@ public void
secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
}
+ @Test
+ public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition()
throws Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+ final String dbName = "default";
+ final String tableName = "compaction_test";
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value
string) partitioned by(pt string) CLUSTERED BY(id) "
+ + "INTO 10 BUCKETS STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("alter table " + tableName + " add
partition(pt='test')",driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + "
partition(pt='test') values ('1','one'),('2','two'),('3','three'),"
+ +
"('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
+ +
"('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
+ +
"('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')",
driver);
+
+ executeStatementOnDriver("insert into " + tableName + "
partition(pt='test') values ('21', 'value21'),('84', 'value84'),"
+ + "('66', 'value66'),('54', 'value54')", driver);
+ executeStatementOnDriver(
+ "insert into " + tableName + " partition(pt='test') values ('22',
'value22'),('34', 'value34')," + "('35', 'value35')", driver);
+ executeStatementOnDriver("insert into " + tableName + "
partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver);
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+ //Do a compaction directly and wait for it to finish
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ rqst.setPartitionname("pt=test");
+ CompactionResponse resp = txnHandler.compact(rqst);
+ runWorker(conf);
+
+ //Try to do a second compaction on the same table before the cleaner runs.
+ try {
+ driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT
'major'");
Review Comment:
This test is mostly a copy of the previous one but not sure we need all of
this stuff. The main part that we wanted to test is the exception message when
a partition is set in the request. To do that it would suffice to just add this
try-catch block in the previous test or rework this test and to keep only the
necessary things.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java:
##########
@@ -50,59 +49,53 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class InitiatorBase extends MetaStoreCompactorThread {
static final private String COMPACTOR_THRESHOLD_PREFIX =
"compactorthreshold.";
- private List<CompactionResponse>
initiateCompactionForMultiplePartitions(Table table,
- Map<String, Partition> partitions, CompactionRequest request) {
- List<CompactionResponse> compactionResponses = new ArrayList<>();
- partitions.entrySet().parallelStream().forEach(entry -> {
- try {
- StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table,
entry.getValue());
- String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
- CompactionInfo ci =
- new CompactionInfo(table.getDbName(), table.getTableName(),
entry.getKey(), request.getType());
- ci.initiatorId = request.getInitiatorId();
- ci.orderByClause = request.getOrderByClause();
- ci.initiatorVersion = request.getInitiatorVersion();
- if (request.getNumberOfBuckets() > 0) {
- ci.numberOfBuckets = request.getNumberOfBuckets();
- }
- ci.poolName = request.getPoolName();
- LOG.info(
- "Checking to see if we should compact partition " + entry.getKey()
+ " of table " + table.getDbName() + "."
- + table.getTableName());
- CollectionUtils.addIgnoreNull(compactionResponses,
- scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs,
false));
- } catch (IOException | InterruptedException | MetaException e) {
- LOG.error(
- "Error occurred while Checking if we should compact partition " +
entry.getKey() + " of table " + table.getDbName() + "."
- + table.getTableName() + " Exception: " + e.getMessage());
- throw new RuntimeException(e);
- }
- });
- return compactionResponses;
- }
-
- public List<CompactionResponse> initiateCompactionForTable(CompactionRequest
request, Table table, Map<String, Partition> partitions) throws Exception {
+ public void initialize() throws Exception {
+ super.init(new AtomicBoolean());
Review Comment:
It's quite strange to introduce a new initializer method with roughly the
same name with the method of its superclass that calls that method but doesn't
override it.
For callers of `InitiatorBase` it would be hard to figure out when they
should call `init` and when they should call `initialize`. Moreover it is
unclear how sub-classes should deal with these two methods.
Is it possible to make this an override or we risk breaking the subclasses?
##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -69,40 +79,48 @@ public AlterTableCompactOperation(DDLOperationContext
context, AlterTableCompact
compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
}
- InitiatorBase initiatorBase = new InitiatorBase();
- initiatorBase.setConf(context.getConf());
- initiatorBase.init(new AtomicBoolean());
-
- Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
- convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
- if(desc.getPartitionSpec() != null){
- Optional<String> partitionName =
partitionMap.keySet().stream().findFirst();
- partitionName.ifPresent(compactionRequest::setPartitionname);
- }
- List<CompactionResponse> compactionResponses =
- initiatorBase.initiateCompactionForTable(compactionRequest,
table.getTTable(), partitionMap);
- for (CompactionResponse compactionResponse : compactionResponses) {
- if (!compactionResponse.isAccepted()) {
- String message;
- if (compactionResponse.isSetErrormessage()) {
- message = compactionResponse.getErrormessage();
- throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
table.getDbName(), table.getTableName(),
- "CompactionId: " + compactionResponse.getId(), message);
- }
- context.getConsole().printInfo(
- "Compaction already enqueued with id " +
compactionResponse.getId() + "; State is "
- + compactionResponse.getState());
- continue;
+ //Will directly initiate compaction if an un-partitioned table/a partition
is specified in the request
+ if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+ if (desc.getPartitionSpec() != null) {
+ Optional<String> partitionName =
partitionMap.keySet().stream().findFirst();
+ partitionName.ifPresent(compactionRequest::setPartitionname);
}
- context.getConsole().printInfo("Compaction enqueued with id " +
compactionResponse.getId());
- if (desc.isBlocking() && compactionResponse.isAccepted()) {
- waitForCompactionToFinish(compactionResponse, context);
+ CompactionResponse compactionResponse =
initiatorBase.initiateCompactionForTable(compactionRequest);
Review Comment:
OK thanks for the explanation! It makes sense to keep the current behavior.
If for whatever reason we decide to change this it can be done in another PR.
##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -56,6 +59,13 @@ public AlterTableCompactOperation(DDLOperationContext
context, AlterTableCompact
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED,
table.getDbName(), table.getTableName());
}
+ Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+ convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+ InitiatorBase initiatorBase = new InitiatorBase();
+ initiatorBase.setConf(context.getConf());
+ initiatorBase.initialize();
+
Review Comment:
Is there any particular reason why this block was moved earlier up in the
class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]