This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new ff8d843931f HIVE-28341: Iceberg: Incremental Major QB Full table compaction (Dmitriy Fingerman, reviewed by Denys Kuzmenko) ff8d843931f is described below commit ff8d843931fa1c9b73eee88152313e7710753269 Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com> AuthorDate: Tue Aug 6 07:32:29 2024 -0400 HIVE-28341: Iceberg: Incremental Major QB Full table compaction (Dmitriy Fingerman, reviewed by Denys Kuzmenko) Closes #5328 --- .../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +- .../mr/hive/HiveIcebergOutputCommitter.java | 15 +- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 34 +-- .../apache/iceberg/mr/hive/IcebergTableUtil.java | 54 ++++- .../compaction/IcebergMajorQueryCompactor.java | 29 ++- .../iceberg_major_compaction_partition_evolution.q | 2 + ...iceberg_major_compaction_partition_evolution2.q | 55 +++++ .../iceberg_major_compaction_partitioned.q | 2 + .../iceberg_major_compaction_schema_evolution.q | 2 + ...berg_major_compaction_partition_evolution.q.out | 8 +- ...erg_major_compaction_partition_evolution2.q.out | 263 +++++++++++++++++++++ .../iceberg_major_compaction_partitioned.q.out | 17 +- ...iceberg_major_compaction_schema_evolution.q.out | 7 +- .../test/resources/testconfiguration.properties | 1 + ql/src/java/org/apache/hadoop/hive/ql/Context.java | 4 +- .../compact/AlterTableCompactOperation.java | 13 +- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 2 +- .../hive/ql/metadata/HiveStorageHandler.java | 31 ++- .../hive/ql/txn/compactor/CompactorUtil.java | 4 + 19 files changed, 485 insertions(+), 60 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1d8ebdcd0bc..15174dcc500 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -373,7 +373,7 @@ public enum ErrorMsg { "metastore."), INVALID_COMPACTION_TYPE(10282, "Invalid compaction type, supported values are 'major' and " + "'minor'"), - NO_COMPACTION_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), + COMPACTION_NO_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), TOO_MANY_COMPACTION_PARTITIONS(10284, "Compaction can only be requested on one partition at a " + "time."), DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"), diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 66c34361fba..275681199e3 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -604,11 +605,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { */ private void commitCompaction(Table table, long startTime, FilesForCommit results, RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { - if (results.dataFiles().isEmpty()) { - LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table); - return; - } - if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { + if (rewritePolicy == RewritePolicy.FULL_TABLE) { // Full table compaction Transaction transaction = table.newTransaction(); DeleteFiles delete = transaction.newDelete(); @@ -621,8 +618,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { LOG.debug("Compacted full table with files {}", results); } else { // Single partition compaction - List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath); - List<DeleteFile> existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath); + List<DataFile> existingDataFiles = + IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); + List<DeleteFile> existingDeleteFiles = + IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath, + partitionPath == null ? Predicate.isEqual(partitionSpecId).negate() : Predicate.isEqual(partitionSpecId)); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 071130f0977..ded1d194358 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -1912,11 +1912,12 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } private boolean hasUndergonePartitionEvolution(Table table) { - // If it is a table which has undergone partition evolution, return true. + // The current spec is not necessary the latest which can happen when partition spec was changed to one of + // table's past specs. return table.currentSnapshot() != null && table.currentSnapshot().allManifests(table.io()).parallelStream() .map(ManifestFile::partitionSpecId) - .anyMatch(id -> id < table.spec().specId()); + .anyMatch(id -> id != table.spec().specId()); } private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) { @@ -1935,8 +1936,9 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H @Override public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, - Map<String, String> partitionSpec) throws SemanticException { - return getPartitionNames(table, partitionSpec).stream() + Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException { + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, latestSpecOnly).stream() .map(partName -> { Map<String, String> partSpecMap = Maps.newLinkedHashMap(); Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null); @@ -1944,6 +1946,11 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H }).collect(Collectors.toList()); } + public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return table.spec().isPartitioned(); + } + @Override public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec, Context.RewritePolicy policy) throws SemanticException { @@ -1968,21 +1975,10 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H * @return A list of partition values which satisfies the partition spec provided corresponding to the table. * @throws SemanticException Exception raised when there is an issue performing a scan on the partitions table. */ - @Override public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec) throws SemanticException { Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - - try { - return IcebergTableUtil - .getPartitionInfo(icebergTable, partitionSpec, true).entrySet().stream().map(e -> { - PartitionData partitionData = e.getKey(); - int specId = e.getValue(); - return icebergTable.specs().get(specId).partitionToPath(partitionData); - }).collect(Collectors.toList()); - } catch (IOException e) { - throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); - } + return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, true); } /** @@ -2132,4 +2128,10 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H tableDesc.setProperty(HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableDesc.getTableName(), Operation.DELETE.name()); } + + @Override + public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + return hasUndergonePartitionEvolution(table); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 2016b68905a..9a661bdaa73 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Properties; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -393,28 +394,47 @@ public class IcebergTableUtil { return data; } - public static List<DataFile> getDataFiles(Table table, int specId, - String partitionPath) { + /** + * Returns list of data files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId filter that's applied on data files' spec ids + */ + public static List<DataFile> getDataFiles(Table table, int specId, String partitionPath, + Predicate<Object> matchBySpecId) { CloseableIterable<FileScanTask> fileScanTasks = table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); CloseableIterable<FileScanTask> filteredFileScanTasks = CloseableIterable.filter(fileScanTasks, t -> { DataFile file = t.asFileScanTask().file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); } - public static List<DeleteFile> getDeleteFiles(Table table, int specId, String partitionPath) { + /** + * Returns list of delete files filtered by specId and partitionPath as following: + * 1. If matchBySpecId is true, then filters files by specId == file's specId, else by specId != file's specId + * 2. If partitionPath is not null, then also filters files where partitionPath == file's partition path + * @param table the iceberg table + * @param specId partition spec id + * @param partitionPath partition path + * @param matchBySpecId filter that's applied on delete files' spec ids + */ + public static List<DeleteFile> getDeleteFiles(Table table, int specId, String partitionPath, + Predicate<Object> matchBySpecId) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles(); CloseableIterable<ScanTask> filteredDeletesScanTasks = CloseableIterable.filter(deletesScanTasks, t -> { DeleteFile file = ((PositionDeletesScanTask) t).file(); - return file.specId() == specId && table.specs() - .get(specId).partitionToPath(file.partition()).equals(partitionPath); + return matchBySpecId.test(file.specId()) && (partitionPath == null || (partitionPath != null && + table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath))); }); return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, t -> ((PositionDeletesScanTask) t).file())); @@ -465,7 +485,7 @@ public class IcebergTableUtil { } public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, Map<String, String> partSpecMap, - boolean allowPartialSpec) throws SemanticException, IOException { + boolean allowPartialSpec, boolean latestSpecOnly) throws SemanticException, IOException { Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap); PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils .createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS); @@ -484,10 +504,26 @@ public class IcebergTableUtil { ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()), expression, false); return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) && - (entry.getKey().size() == partSpecMap.size() || allowPartialSpec); + (entry.getKey().size() == partSpecMap.size() || allowPartialSpec) && + (entry.getValue() == icebergTable.spec().specId() || !latestSpecOnly); }).forEach(entry -> result.put(entry.getKey(), entry.getValue()))); } return result; } + + public static List<String> getPartitionNames(Table icebergTable, Map<String, String> partitionSpec, + boolean latestSpecOnly) throws SemanticException { + try { + return IcebergTableUtil + .getPartitionInfo(icebergTable, partitionSpec, true, latestSpecOnly).entrySet().stream() + .map(e -> { + PartitionData partitionData = e.getKey(); + int specId = e.getValue(); + return icebergTable.specs().get(specId).partitionToPath(partitionData); + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); + } + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java index 6173c804b4b..411031b50c8 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java @@ -63,20 +63,35 @@ public class IcebergMajorQueryCompactor extends QueryCompactor { HiveConf conf = new HiveConf(context.getConf()); String partSpec = context.getCompactionInfo().partName; + org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), + context.getTable().getTableName()); + Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); String compactionQuery; if (partSpec == null) { - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.ALL_PARTITIONS.name()); - compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName); + if (!icebergTable.spec().isPartitioned()) { + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name()); + compactionQuery = String.format("insert overwrite table %s select * from %<s", compactTableName); + } else if (icebergTable.specs().size() > 1) { + // Compacting partitions of old partition specs on a partitioned table with partition evolution + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); + conf.set(IcebergCompactionService.PARTITION_SPEC_ID, String.valueOf(icebergTable.spec().specId())); + // A single filter on a virtual column causes errors during compilation, + // added another filter on file_path as a workaround. + compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " + + "where %2$s != %3$d and %4$s is not null", + compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), + VirtualColumn.FILE_PATH.getName()); + } else { + // Partitioned table without partition evolution with partition spec as null in the compaction request - this + // code branch is not supposed to be reachable + throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION); + } } else { - org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), - context.getTable().getTableName()); Map<String, String> partSpecMap = new LinkedHashMap<>(); Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null); - - Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); Map<PartitionData, Integer> partitionInfo = IcebergTableUtil - .getPartitionInfo(icebergTable, partSpecMap, false); + .getPartitionInfo(icebergTable, partSpecMap, false, false); Optional<Integer> specId = partitionInfo.values().stream().findFirst(); if (!specId.isPresent()) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q index aef5ae0ef23..7d046aea4e9 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q new file mode 100644 index 00000000000..3fa6f91f90b --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q @@ -0,0 +1,55 @@ +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask compaction id as they will be allocated in parallel threads +--! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ + +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2'); + +insert into ice_orc VALUES ('fn1','ln1', 1); +insert into ice_orc VALUES ('fn2','ln2', 1); +insert into ice_orc VALUES ('fn3','ln3', 1); +insert into ice_orc VALUES ('fn4','ln4', 1); +delete from ice_orc where last_name in ('ln3', 'ln4'); + +alter table ice_orc set partition spec(dept_id); + +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7'); +insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8'); +delete from ice_orc where last_name in ('ln7', 'ln8'); + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + +alter table ice_orc COMPACT 'major' and wait; + +select * from ice_orc; +describe formatted ice_orc; +show compactions; + diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q index 9d766c6b833..cc0d2fa23b9 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q index 73dbe19a94b..8501e694de0 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q @@ -14,6 +14,8 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out index 3f8d8914c5e..d9dee1d9902 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out @@ -239,7 +239,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"1256\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -340,7 +340,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\" [...] current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -352,7 +352,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 19 + snapshot-count 20 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -374,4 +374,6 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out new file mode 100644 index 00000000000..06611f2f27a --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc +POSTHOOK: query: create table ice_orc ( + first_name string, + last_name string, + dept_id bigint + ) +stored by iceberg stored as orc +tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: alter table ice_orc set partition spec(dept_id) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: default@ice_orc +POSTHOOK: query: alter table ice_orc set partition spec(dept_id) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 4 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 10 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@ice_orc +PREHOOK: Output: default@ice_orc +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: default@ice_orc +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +fn1 ln1 1 +fn2 ln2 1 +fn5 ln5 2 +fn6 ln6 2 +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +first_name string +last_name string +dept_id bigint + +# Partition Transform Information +# col_name transform_type +dept_id IDENTITY + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"added-records\":\"2\",\"deleted-records\":\"2\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-timestamp-ms #Masked# + default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 2 + numRows 4 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 12 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out index c9314bc4d03..781bb41dd14 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out @@ -191,7 +191,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"1440\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"3\",\"deleted-records\":\"3\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"11\",\"total-delete-files\":\"7\",\"total-position-deletes\":\"7\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -287,7 +287,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"4\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"2\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\" [...] current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -321,7 +321,8 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -517,7 +518,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"1948\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"4\",\"deleted-records\":\"4\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -617,7 +618,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"8\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"7\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"4\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\" [...] current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -651,5 +652,7 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out index cfe8f3d3d46..03deb181cf6 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out @@ -227,7 +227,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"3167\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"deleted-data-files\":\"6\",\"deleted-records\":\"6\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"10\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -325,7 +325,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":2,\"fields\":[{\"id\":1,\"name\":\"fname\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"address\",\"required\":false,\"type\":\"string\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"replace-partitions\":\"true\",\"added-data-files\":\"2\",\"added-records\":\"5\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"6\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"3\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"5\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\" [...] current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000}]} format-version 2 @@ -359,4 +359,5 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 9e7b79fd319..cadf4d15006 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -422,6 +422,7 @@ iceberg.llap.query.files=\ iceberg.llap.query.compactor.files=\ iceberg_major_compaction_partition_evolution.q,\ + iceberg_major_compaction_partition_evolution2.q,\ iceberg_major_compaction_partitioned.q,\ iceberg_major_compaction_query_metadata.q,\ iceberg_major_compaction_schema_evolution.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 8634a55cc3e..2e6df97c152 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -255,8 +255,8 @@ public class Context { public enum RewritePolicy { DEFAULT, - ALL_PARTITIONS, - PARTITION; + PARTITION, + FULL_TABLE; public static RewritePolicy fromString(String rewritePolicy) { if (rewritePolicy == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index ee67cc9a4e4..45204d394c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; +import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -89,7 +90,8 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe } //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request - if (desc.getPartitionSpec() != null || !table.isPartitioned()) { + if (desc.getPartitionSpec() != null || !(table.isPartitioned() || + (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table)))) { if (desc.getPartitionSpec() != null) { Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); partitionName.ifPresent(compactionRequest::setPartitionname); @@ -104,6 +106,13 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf()); parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey()); } + // If Iceberg table had partition evolution, it will create compaction request without partition specification, + // and it will compact all files from old partition specs, besides compacting partitions of current spec in parallel. + if (DDLUtils.isIcebergTable(table) && table.getStorageHandler().hasUndergonePartitionEvolution(table)) { + compactionRequest.setPartitionname(null); + CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); + parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); + } } return 0; } @@ -135,7 +144,7 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe List<Partition> partitions = new ArrayList<>(); if (desc.getPartitionSpec() == null) { - if (table.isPartitioned()) { + if (table.isPartitioned() || (DDLUtils.isIcebergTable(table) && table.getStorageHandler().isPartitioned(table))) { // Compaction will get initiated for all the potential partitions that meets the criteria partitions = context.getDb().getPartitions(table); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e688e3f5fdf..4afdc9a7d2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4298,7 +4298,7 @@ private void constructOneLBLocationMap(FileStatus fSta, public List<Partition> getPartitions(Table tbl, Map<String, String> partialPartSpec) throws HiveException { if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) { - return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec); + return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec, false); } else { return getPartitions(tbl, partialPartSpec, (short)-1); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index ae948d6e85d..168a91f3c3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -778,10 +778,15 @@ public interface HiveStorageHandler extends Configurable { throw new UnsupportedOperationException("Storage handler does not support validating eligibility for compaction"); } + /** + * Returns partitions names for the current table spec that correspond to the provided partition spec. + * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @return List of partition names + */ default List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec) throws SemanticException { - throw new UnsupportedOperationException("Storage handler does not support getting partitions " + - "by a partition specification."); + throw new UnsupportedOperationException("Storage handler does not support getting partition names"); } default ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTable, String colName) @@ -839,9 +844,31 @@ public interface HiveStorageHandler extends Configurable { */ default List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec) throws SemanticException { + return getPartitions(table, partitionSpec, true); + } + + /** + * Returns a list of partitions based on table and partial partition specification. + * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table metadata stored in Hive Metastore + * @param partitionSpec Map of Strings {@link java.util.Map} partition specification + * @param latestSpecOnly Specifies if to return only partitions for the latest partition spec + * @return List of Partitions {@link org.apache.hadoop.hive.ql.metadata.Partition} + * @throws SemanticException {@link org.apache.hadoop.hive.ql.parse.SemanticException} + */ + default List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table, + Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException { throw new UnsupportedOperationException("Storage handler does not support getting partitions for a table."); } + default boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table is partitioned."); + } + + default boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table table) { + throw new UnsupportedOperationException("Storage handler does not support checking if table " + + "undergone partition evolution."); + } + default boolean supportsMergeFiles() { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index 70c550375a2..7bcd4ffd558 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -424,6 +424,10 @@ public class CompactorUtil { private static CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, final StorageDescriptor sd, final Map<String, String> tblProperties, final String runAs, TxnStore txnHandler, HiveConf conf) throws IOException, InterruptedException { + if (MetaStoreUtils.isIcebergTable(tblProperties)) { + return ci.type; + } + // If it's marked as too many aborted, we already know we need to compact if (ci.tooManyAborts) { LOG.debug("Found too many aborted transactions for "