tarak271 commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1431320684
##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -69,40 +79,48 @@ public AlterTableCompactOperation(DDLOperationContext
context, AlterTableCompact
compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
}
- InitiatorBase initiatorBase = new InitiatorBase();
- initiatorBase.setConf(context.getConf());
- initiatorBase.init(new AtomicBoolean());
-
- Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
- convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
- if(desc.getPartitionSpec() != null){
- Optional<String> partitionName =
partitionMap.keySet().stream().findFirst();
- partitionName.ifPresent(compactionRequest::setPartitionname);
- }
- List<CompactionResponse> compactionResponses =
- initiatorBase.initiateCompactionForTable(compactionRequest,
table.getTTable(), partitionMap);
- for (CompactionResponse compactionResponse : compactionResponses) {
- if (!compactionResponse.isAccepted()) {
- String message;
- if (compactionResponse.isSetErrormessage()) {
- message = compactionResponse.getErrormessage();
- throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
table.getDbName(), table.getTableName(),
- "CompactionId: " + compactionResponse.getId(), message);
- }
- context.getConsole().printInfo(
- "Compaction already enqueued with id " +
compactionResponse.getId() + "; State is "
- + compactionResponse.getState());
- continue;
+ //Will directly initiate compaction if an un-partitioned table/a partition
is specified in the request
+ if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+ if (desc.getPartitionSpec() != null) {
+ Optional<String> partitionName =
partitionMap.keySet().stream().findFirst();
+ partitionName.ifPresent(compactionRequest::setPartitionname);
}
- context.getConsole().printInfo("Compaction enqueued with id " +
compactionResponse.getId());
- if (desc.isBlocking() && compactionResponse.isAccepted()) {
- waitForCompactionToFinish(compactionResponse, context);
+ CompactionResponse compactionResponse =
initiatorBase.initiateCompactionForTable(compactionRequest);
Review Comment:
initiateCompactionForTable is used for straight forward compaction request
for a non-partitioned table or a specific partition is given
Where as initiateCompactionForPartition is used to check whether the
partition is eligible to get compacted or not, if eligible then only compaction
will get initiated. This the reason why i had to pass Table, Partition objects
to re-use methods from Initiator and decide whether to initiate compaction or
not.
**Background**
Supported case for alter table compact command are
```
1. alter table <table_name> compact '<manjor/minor>'; // For Unpartitioned
table
2. alter table <table_name> partition <partition_name> compact
'<major/minor>'; // to compact specific partition of a partitioned table
```
This command will not work for partitioned tables if we don't provide a
partition name in that command.
Command fails stating that the command won't work for partitioned table
This has been enhanced(https://issues.apache.org/jira/browse/HIVE-27598) to
support even partitioned table if user want's to compact all the eligible
partitions in one command instead of executing alter table 1000 times if they
have 1000 partitions.
So the plan is to check each partition and initiate compaction if the
partition is eligible to be compacted, otherwise unnecessarily compaction
requests would get flooded if the partition count is high
`3. alter table <partitioned_table_name> compact '<major/minor>'; // will
scan all partitions and initiate compactions for eligible partitions`
For case 2, Since user asked to compact a specific partition, we will
initiate compaction straight away. we don't want to check the eligibility of
the partition(which is existing old behaviour)
Use case could be that the partition needs to be compacted irrespective of
the delta folder count threshold for some reason.
Use case:
```
create table test_acid(id int) partitioned by(dt string);
insert into test_acid values(1,"a");
insert into test_acid values(2,"a");
insert into test_acid values(3,"a");
insert into test_acid values(4,"a");
alter table test_acid partition(dt='a') compact 'major'; // partition get
compacted even the number of delta folders is 4
```
before compaction, the table will contain 4 delta folders and after
compaction the table will contain only single base directory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]