This is an automated email from the ASF dual-hosted git repository. sbadhya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new c1d592f4f98 HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables (#5076) (Sourabh Badhya reviewed by Denys Kuzmenko, Krisztian Kasa) c1d592f4f98 is described below commit c1d592f4f98e49614ffaed57813983b579b9147f Author: Sourabh Badhya <iamsbad...@gmail.com> AuthorDate: Thu Mar 21 15:48:05 2024 +0530 HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables (#5076) (Sourabh Badhya reviewed by Denys Kuzmenko, Krisztian Kasa) --- .../iceberg/mr/hive/HiveIcebergInputFormat.java | 2 +- .../mr/hive/HiveIcebergOutputCommitter.java | 66 +++ .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 27 ++ .../mr/hive/IcebergMergeTaskProperties.java | 55 +++ .../test/queries/positive/iceberg_merge_files.q | 95 +++++ .../positive/llap/iceberg_merge_files.q.out | 458 +++++++++++++++++++++ .../test/resources/testconfiguration.properties | 2 + .../hadoop/hive/ql/io/CombineHiveInputFormat.java | 13 +- .../hive/ql/metadata/HiveStorageHandler.java | 17 + .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 41 ++ .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 13 +- .../ql/plan/ConditionalResolverMergeFiles.java | 132 ++++-- .../hadoop/hive/ql/plan/MergeTaskProperties.java | 30 ++ 13 files changed, 921 insertions(+), 30 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index dd329c122aa..1ea78eeba54 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -191,7 +191,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record> @Override public boolean shouldSkipCombine(Path path, Configuration conf) { - return true; + return false; } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index b4d5ce98f59..d9f3116ff84 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.Context.RewritePolicy; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; @@ -167,6 +170,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { LOG.info("CommitTask found no serialized table in config for table: {}.", output); } }, IOException.class); + + cleanMergeTaskInputFiles(jobConf, tableExecutor, context); } finally { if (tableExecutor != null) { tableExecutor.shutdown(); @@ -741,4 +746,65 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } + + public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOException { + List<OutputTable> outputs = collectOutputs(jobContexts); + ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size()); + Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>(); + try { + Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() + .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) + .suppressFailureWhenFinished() + .executeWith(tableExecutor) + .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc)) + .run(output -> { + JobContext jobContext = output.getValue(); + JobConf jobConf = jobContext.getJobConf(); + LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); + + Table table = output.getKey(); + FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf); + String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); + // list jobLocation to get number of forCommit files + // we do this because map/reduce num in jobConf is unreliable + // and we have no access to vertex status info + int numTasks = listForCommits(jobConf, jobLocation).size(); + FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, + table.io(), false); + for (DataFile dataFile : results.dataFiles()) { + FileStatus fileStatus = fileSystem.getFileStatus(new Path(dataFile.path().toString())); + dataFiles.add(fileStatus); + } + }, IOException.class); + } finally { + fileExecutor.shutdown(); + if (tableExecutor != null) { + tableExecutor.shutdown(); + } + } + return Lists.newArrayList(dataFiles); + } + + private void cleanMergeTaskInputFiles(JobConf jobConf, + ExecutorService tableExecutor, + TaskAttemptContext context) throws IOException { + // Merge task has merged several files into one. Hence we need to remove the stale files. + // At this stage the file is written and task-committed, but the old files are still present. + if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) { + MapWork mrwork = Utilities.getMapWork(jobConf); + if (mrwork != null) { + List<Path> mergedPaths = mrwork.getInputPaths(); + if (mergedPaths != null) { + Tasks.foreach(mergedPaths) + .retry(3) + .executeWith(tableExecutor) + .run(path -> { + FileSystem fs = path.getFileSystem(context.getJobConf()); + fs.delete(path, true); + }, IOException.class); + } + } + } + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 7fcaebbfcad..147107638ac 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -45,6 +45,7 @@ import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -107,6 +108,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.MergeTaskProperties; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.session.SessionState; @@ -2029,4 +2031,29 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e)); } } + + @Override + public boolean supportsMergeFiles() { + return true; + } + + @Override + public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOException { + String tableName = properties.getProperty(Catalogs.NAME); + String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF); + Configuration configuration = SessionState.getSessionConf(); + List<JobContext> originalContextList = generateJobContext(configuration, tableName, snapshotRef); + List<JobContext> jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + if (jobContextList.isEmpty()) { + return Collections.emptyList(); + } + return new HiveIcebergOutputCommitter().getOutputFiles(jobContextList); + } + + @Override + public MergeTaskProperties getMergeTaskProperties(Properties properties) { + return new IcebergMergeTaskProperties(properties); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java new file mode 100644 index 00000000000..ff47a801a5b --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatFactory; +import org.apache.hadoop.hive.ql.plan.MergeTaskProperties; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mr.Catalogs; + +public class IcebergMergeTaskProperties implements MergeTaskProperties { + + private final Properties properties; + private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); + + IcebergMergeTaskProperties(Properties properties) { + this.properties = properties; + } + + public Path getTmpLocation() { + String location = properties.getProperty(Catalogs.LOCATION); + return new Path(location + "/data/"); + } + + public StorageFormatDescriptor getStorageFormatDescriptor() throws IOException { + FileFormat fileFormat = FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)); + StorageFormatDescriptor descriptor = storageFormatFactory.get(fileFormat.name()); + if (descriptor == null) { + throw new IOException("Unsupported storage format descriptor"); + } + return descriptor; + } + +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q new file mode 100644 index 00000000000..5d5cd7aa6d8 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q @@ -0,0 +1,95 @@ +--! qt:dataset:src +set hive.merge.mapredfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.tezfiles=true; +set hive.optimize.sort.dynamic.partition.threshold=-1; +set mapred.reduce.tasks=5; +set hive.blobstore.supported.schemes=hdfs,file; + +-- SORT_QUERY_RESULTS +create table orc_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as orc; +create table orc_source(key string) stored by iceberg stored as orc; + +-- The partitioned table must have 2 files per partition (necessary for merge task) +insert overwrite table orc_part_source partition(ds='102') select * from src; +insert into table orc_part_source partition(ds='102') select * from src; +insert overwrite table orc_part_source partition(ds='103') select * from src; +insert into table orc_part_source partition(ds='103') select * from src; + +-- The unpartitioned table must have 2 files. +insert overwrite table orc_source select key from src; +insert into table orc_source select key from src; + +select count(*) from orc_source; +select count(*) from orc_part_source; + +select count(distinct(file_path)) from default.orc_source.files; +select count(distinct(file_path)) from default.orc_part_source.files; + +-- Insert into the tables both for unpartitioned and partitioned cases for ORC formats. +insert into table orc_source select * from orc_source; +insert into table orc_part_source select * from orc_part_source where ds = 102 union all select * from orc_part_source where ds = 103; + +select count(*) from orc_source; +select count(*) from orc_part_source; + +select count(distinct(file_path)) from default.orc_source.files; +select count(distinct(file_path)) from default.orc_part_source.files; + +create table parquet_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as parquet; +create table parquet_source(key string) stored by iceberg stored as parquet; + +-- The partitioned table must have 2 files per partition (necessary for merge task) +insert overwrite table parquet_part_source partition(ds='102') select * from src; +insert into table parquet_part_source partition(ds='102') select * from src; +insert overwrite table parquet_part_source partition(ds='103') select * from src; +insert into table parquet_part_source partition(ds='103') select * from src; + +-- The unpartitioned table must have 2 files. +insert overwrite table parquet_source select key from src; +insert into table parquet_source select key from src; + +select count(*) from parquet_source; +select count(*) from parquet_part_source; + +select count(distinct(file_path)) from default.parquet_source.files; +select count(distinct(file_path)) from default.parquet_part_source.files; + +-- Insert into the tables both for unpartitioned and partitioned cases for Parquet formats. +insert into table parquet_source select * from parquet_source; +insert into table parquet_part_source select * from parquet_part_source where ds = 102 union all select * from orc_part_source where ds = 103; + +select count(*) from parquet_source; +select count(*) from parquet_part_source; + +select count(distinct(file_path)) from default.parquet_source.files; +select count(distinct(file_path)) from default.parquet_part_source.files; + +create table avro_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as avro; +create table avro_source(key string) stored by iceberg stored as avro; + +-- The partitioned table must have 2 files per partition (necessary for merge task) +insert overwrite table avro_part_source partition(ds='102') select * from src; +insert into table avro_part_source partition(ds='102') select * from src; +insert overwrite table avro_part_source partition(ds='103') select * from src; +insert into table avro_part_source partition(ds='103') select * from src; + +-- The unpartitioned table must have 2 files. +insert overwrite table avro_source select key from src; +insert into table avro_source select key from src; + +select count(*) from avro_source; +select count(*) from avro_part_source; + +select count(distinct(file_path)) from default.avro_source.files; +select count(distinct(file_path)) from default.avro_part_source.files; + +-- Insert into the tables both for unpartitioned and partitioned cases for Avro formats. +insert into table avro_source select * from avro_source; +insert into table avro_part_source select * from avro_part_source where ds = 102 union all select * from avro_part_source where ds = 103; + +select count(*) from avro_source; +select count(*) from avro_part_source; + +select count(distinct(file_path)) from default.avro_source.files; +select count(distinct(file_path)) from default.avro_part_source.files; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out new file mode 100644 index 00000000000..a17c1130efc --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out @@ -0,0 +1,458 @@ +PREHOOK: query: create table orc_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: create table orc_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: create table orc_source(key string) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_source +POSTHOOK: query: create table orc_source(key string) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert overwrite table orc_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source@ds=102 +POSTHOOK: query: insert overwrite table orc_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source@ds=102 +PREHOOK: query: insert into table orc_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source@ds=102 +POSTHOOK: query: insert into table orc_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source@ds=102 +PREHOOK: query: insert overwrite table orc_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source@ds=103 +POSTHOOK: query: insert overwrite table orc_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source@ds=103 +PREHOOK: query: insert into table orc_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_part_source@ds=103 +POSTHOOK: query: insert into table orc_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_part_source@ds=103 +PREHOOK: query: insert overwrite table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert overwrite table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert into table orc_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert into table orc_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_source +PREHOOK: query: select count(*) from orc_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +1000 +PREHOOK: query: select count(*) from orc_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +2 +PREHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +4 +PREHOOK: query: insert into table orc_source select * from orc_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +PREHOOK: Output: default@orc_source +POSTHOOK: query: insert into table orc_source select * from orc_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +POSTHOOK: Output: default@orc_source +PREHOOK: query: insert into table orc_part_source select * from orc_part_source where ds = 102 union all select * from orc_part_source where ds = 103 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +PREHOOK: Output: default@orc_part_source +POSTHOOK: query: insert into table orc_part_source select * from orc_part_source where ds = 102 union all select * from orc_part_source where ds = 103 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +POSTHOOK: Output: default@orc_part_source +PREHOOK: query: select count(*) from orc_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(*) from orc_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +4000 +PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_source +#### A masked pattern was here #### +3 +PREHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.orc_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +#### A masked pattern was here #### +6 +PREHOOK: query: create table parquet_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_part_source +POSTHOOK: query: create table parquet_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_part_source +PREHOOK: query: create table parquet_source(key string) stored by iceberg stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_source +POSTHOOK: query: create table parquet_source(key string) stored by iceberg stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert overwrite table parquet_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_part_source@ds=102 +POSTHOOK: query: insert overwrite table parquet_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_part_source@ds=102 +PREHOOK: query: insert into table parquet_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_part_source@ds=102 +POSTHOOK: query: insert into table parquet_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_part_source@ds=102 +PREHOOK: query: insert overwrite table parquet_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_part_source@ds=103 +POSTHOOK: query: insert overwrite table parquet_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_part_source@ds=103 +PREHOOK: query: insert into table parquet_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_part_source@ds=103 +POSTHOOK: query: insert into table parquet_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_part_source@ds=103 +PREHOOK: query: insert overwrite table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert overwrite table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert into table parquet_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert into table parquet_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parquet_source +PREHOOK: query: select count(*) from parquet_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +1000 +PREHOOK: query: select count(*) from parquet_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +2 +PREHOOK: query: select count(distinct(file_path)) from default.parquet_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +4 +PREHOOK: query: insert into table parquet_source select * from parquet_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +PREHOOK: Output: default@parquet_source +POSTHOOK: query: insert into table parquet_source select * from parquet_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +POSTHOOK: Output: default@parquet_source +PREHOOK: query: insert into table parquet_part_source select * from parquet_part_source where ds = 102 union all select * from orc_part_source where ds = 103 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_part_source +PREHOOK: Input: default@parquet_part_source +PREHOOK: Output: default@parquet_part_source +POSTHOOK: query: insert into table parquet_part_source select * from parquet_part_source where ds = 102 union all select * from orc_part_source where ds = 103 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_part_source +POSTHOOK: Input: default@parquet_part_source +POSTHOOK: Output: default@parquet_part_source +PREHOOK: query: select count(*) from parquet_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(*) from parquet_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from parquet_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +5000 +PREHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_source +#### A masked pattern was here #### +3 +PREHOOK: query: select count(distinct(file_path)) from default.parquet_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.parquet_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_part_source +#### A masked pattern was here #### +6 +PREHOOK: query: create table avro_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@avro_part_source +POSTHOOK: query: create table avro_part_source(key string, value string, ds string) partitioned by spec (ds) stored by iceberg stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@avro_part_source +PREHOOK: query: create table avro_source(key string) stored by iceberg stored as avro +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@avro_source +POSTHOOK: query: create table avro_source(key string) stored by iceberg stored as avro +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert overwrite table avro_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_part_source@ds=102 +POSTHOOK: query: insert overwrite table avro_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_part_source@ds=102 +PREHOOK: query: insert into table avro_part_source partition(ds='102') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_part_source@ds=102 +POSTHOOK: query: insert into table avro_part_source partition(ds='102') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_part_source@ds=102 +PREHOOK: query: insert overwrite table avro_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_part_source@ds=103 +POSTHOOK: query: insert overwrite table avro_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_part_source@ds=103 +PREHOOK: query: insert into table avro_part_source partition(ds='103') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_part_source@ds=103 +POSTHOOK: query: insert into table avro_part_source partition(ds='103') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_part_source@ds=103 +PREHOOK: query: insert overwrite table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert overwrite table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert into table avro_source select key from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert into table avro_source select key from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@avro_source +PREHOOK: query: select count(*) from avro_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +1000 +PREHOOK: query: select count(*) from avro_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(distinct(file_path)) from default.avro_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +2 +PREHOOK: query: select count(distinct(file_path)) from default.avro_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +4 +PREHOOK: query: insert into table avro_source select * from avro_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +PREHOOK: Output: default@avro_source +POSTHOOK: query: insert into table avro_source select * from avro_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +POSTHOOK: Output: default@avro_source +PREHOOK: query: insert into table avro_part_source select * from avro_part_source where ds = 102 union all select * from avro_part_source where ds = 103 +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_part_source +PREHOOK: Output: default@avro_part_source +POSTHOOK: query: insert into table avro_part_source select * from avro_part_source where ds = 102 union all select * from avro_part_source where ds = 103 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_part_source +POSTHOOK: Output: default@avro_part_source +PREHOOK: query: select count(*) from avro_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +2000 +PREHOOK: query: select count(*) from avro_part_source +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from avro_part_source +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +4000 +PREHOOK: query: select count(distinct(file_path)) from default.avro_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_source +#### A masked pattern was here #### +3 +PREHOOK: query: select count(distinct(file_path)) from default.avro_part_source.files +PREHOOK: type: QUERY +PREHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct(file_path)) from default.avro_part_source.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@avro_part_source +#### A masked pattern was here #### +6 diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 290f4e3acad..f682c6ef7d3 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -410,6 +410,7 @@ erasurecoding.only.query.files=\ erasure_simple.q iceberg.llap.query.files=\ + iceberg_merge_files.q,\ llap_iceberg_read_orc.q,\ llap_iceberg_read_parquet.q,\ vectorized_iceberg_read_mixed.q,\ @@ -425,6 +426,7 @@ iceberg.llap.query.compactor.files=\ iceberg_optimize_table_unpartitioned.q iceberg.llap.only.query.files=\ + iceberg_merge_files.q,\ llap_iceberg_read_orc.q,\ llap_iceberg_read_parquet.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index bd5284254f8..c1afa681ade 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -33,9 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +56,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -65,6 +64,8 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileSplit; +import static jodd.util.ClassUtil.isAssignableFrom; + /** * CombineHiveInputFormat is a parameterized InputFormat which looks at the path @@ -370,8 +371,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); TableDesc tableDesc = part.getTableDesc(); - if ((tableDesc != null) && tableDesc.isNonNative()) { - return super.getSplits(job, numSplits); + if (tableDesc != null) { + boolean useDefaultFileFormat = part.getInputFileFormatClass() + .isAssignableFrom(tableDesc.getInputFileFormatClass()); + if (tableDesc.isNonNative() && useDefaultFileFormat) { + return super.getSplits(job, numSplits); + } } // Use HiveInputFormat if any of the paths is not splittable diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index e9a0d139e90..a2c476c9ad5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.metadata; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.type.SnapshotContext; @@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.UpdateSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MergeTaskProperties; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -747,4 +750,18 @@ public interface HiveStorageHandler extends Configurable { throw new UnsupportedOperationException("Storage handler does not support getting partitions by expression " + "for a table."); } + + default boolean supportsMergeFiles() { + return false; + } + + default List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOException { + throw new UnsupportedOperationException("Storage handler does not support getting merge input files " + + "for a table."); + } + + default MergeTaskProperties getMergeTaskProperties(Properties properties) { + throw new UnsupportedOperationException("Storage handler does not support getting merge input files " + + "for a table."); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 525ecfbb13e..1eb256ceb86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -41,8 +41,11 @@ import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; +import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DemuxOperator; @@ -1720,6 +1723,41 @@ public final class GenMapRedUtils { return newWork; } + private static void setStorageHandlerAndProperties(ConditionalResolverMergeFilesCtx mrCtx, MoveWork work) { + Properties mergeTaskProperties = null; + String storageHandlerClass = null; + if (work.getLoadTableWork() != null) { + // Get the info from the table data + TableDesc tableDesc = work.getLoadTableWork().getTable(); + storageHandlerClass = tableDesc.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE); + mergeTaskProperties = new Properties(tableDesc.getProperties()); + } else { + // Get the info from the create table data + CreateTableDesc createTableDesc = work.getLoadFileWork().getCtasCreateTableDesc(); + String location = null; + if (createTableDesc != null) { + storageHandlerClass = createTableDesc.getStorageHandler(); + mergeTaskProperties = new Properties(); + mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME, createTableDesc.getDbTableName()); + location = createTableDesc.getLocation(); + } else { + CreateMaterializedViewDesc createViewDesc = work.getLoadFileWork().getCreateViewDesc(); + if (createViewDesc != null) { + storageHandlerClass = createViewDesc.getStorageHandler(); + mergeTaskProperties = new Properties(); + mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME, createViewDesc.getViewName()); + location = createViewDesc.getLocation(); + } + } + if (location != null) { + mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_LOCATION, location); + } + } + mrCtx.setTaskProperties(mergeTaskProperties); + mrCtx.setStorageHandlerClass(storageHandlerClass); + } + /** * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork. * @@ -1800,6 +1838,9 @@ public final class GenMapRedUtils { cndTsk.setResolver(new ConditionalResolverMergeFiles()); ConditionalResolverMergeFilesCtx mrCtx = new ConditionalResolverMergeFilesCtx(listTasks, condInputPath.toString()); + if (moveTaskToLink != null) { + setStorageHandlerAndProperties(mrCtx, moveTaskToLink.getWork()); + } cndTsk.setResolverCtx(mrCtx); // make the conditional task as the child of the current leaf task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b15495a67dc..dd3f75fa665 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8159,8 +8159,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean canBeMerged = (destinationTable == null || !((destinationTable.getNumBuckets() > 0) || (destinationTable.getSortCols() != null && destinationTable.getSortCols().size() > 0))); - // If this table is working with ACID semantics, turn off merging + // If this table is working with ACID semantics or + // if its a delete, update, merge operation that supports merge task, turn off merging canBeMerged &= !destTableIsFullAcid; + if (destinationTable != null && destinationTable.getStorageHandler() != null) { + canBeMerged &= destinationTable.getStorageHandler().supportsMergeFiles(); + // TODO: Support for merge task for update, delete and merge queries + // when storage handler supports it. + if (Context.Operation.UPDATE.equals(ctx.getOperation()) + || Context.Operation.DELETE.equals(ctx.getOperation()) + || Context.Operation.MERGE.equals(ctx.getOperation())) { + canBeMerged = false; + } + } // Generate the partition columns from the parent input if (destType == QBMetaData.DEST_TABLE || destType == QBMetaData.DEST_PARTITION) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index c5aecaa9cae..05e54798840 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import com.google.common.collect.Lists; @@ -37,12 +38,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.serde.serdeConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +75,8 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, private String dir; private DynamicPartitionCtx dpCtx; // merge task could be after dynamic partition insert private ListBucketingCtx lbCtx; + private Properties properties; + private String storageHandlerClass; public ConditionalResolverMergeFilesCtx() { } @@ -125,6 +133,22 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, public void setLbCtx(ListBucketingCtx lbCtx) { this.lbCtx = lbCtx; } + + public void setTaskProperties(Properties properties) { + this.properties = properties; + } + + public Properties getTaskProperties() { + return properties; + } + + public void setStorageHandlerClass(String className) { + this.storageHandlerClass = className; + } + + public String getStorageHandlerClass() { + return storageHandlerClass; + } } public List<Task<?>> getTasks(HiveConf conf, Object objCtx) { @@ -147,18 +171,21 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, Path dirPath = new Path(dirName); FileSystem inpFs = dirPath.getFileSystem(conf); DynamicPartitionCtx dpCtx = ctx.getDPCtx(); + HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, ctx.getStorageHandlerClass()); + boolean dirExists = inpFs.exists(dirPath); + boolean useCustomStorageHandler = storageHandler != null && storageHandler.supportsMergeFiles(); + + MapWork work = null; + // For each dynamic partition, check if it needs to be merged. + if (mrTask.getWork() instanceof MapredWork) { + work = ((MapredWork) mrTask.getWork()).getMapWork(); + } else if (mrTask.getWork() instanceof TezWork){ + work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0); + } else { + work = (MapWork) mrTask.getWork(); + } - if (inpFs.exists(dirPath)) { - // For each dynamic partition, check if it needs to be merged. - MapWork work; - if (mrTask.getWork() instanceof MapredWork) { - work = ((MapredWork) mrTask.getWork()).getMapWork(); - } else if (mrTask.getWork() instanceof TezWork){ - work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0); - } else { - work = (MapWork) mrTask.getWork(); - } - + if (dirExists) { int lbLevel = (ctx.getLbCtx() == null) ? 0 : ctx.getLbCtx().calculateListBucketingLevel(); boolean manifestFilePresent = false; FileSystem manifestFs = dirPath.getFileSystem(conf); @@ -182,11 +209,11 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, int dpLbLevel = numDPCols + lbLevel; generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, mrTask, - mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel, manifestFilePresent); + mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel, manifestFilePresent, storageHandler); } else { // no dynamic partitions if(lbLevel == 0) { // static partition without list bucketing - List<FileStatus> manifestFilePaths = new ArrayList<>(); + List<FileStatus> manifestFilePaths = Lists.newArrayList(); long totalSize; if (manifestFilePresent) { manifestFilePaths = getManifestFilePaths(conf, dirPath); @@ -208,15 +235,20 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } else { // static partition and list bucketing generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, mrTask, - mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, manifestFilePresent); + mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, manifestFilePresent, storageHandler); } } + } else if (useCustomStorageHandler) { + generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, mrTask, + mrAndMvTask, dirPath, inpFs, ctx, work, 0, false, storageHandler); } else { Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + dirPath); resTsks.add(mvTask); } } catch (IOException e) { LOG.warn("Exception while getting tasks", e); + } catch (ClassNotFoundException | HiveException e) { + throw new RuntimeException("Failed to load storage handler: {}" + e.getMessage()); } // Only one of the tasks should ever be added to resTsks @@ -254,18 +286,26 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, long trgtSize, long avgConditionSize, Task<?> mvTask, Task<?> mrTask, Task<?> mrAndMvTask, Path dirPath, FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, int dpLbLevel, - boolean manifestFilePresent) - throws IOException { + boolean manifestFilePresent, HiveStorageHandler storageHandler) + throws IOException, ClassNotFoundException { DynamicPartitionCtx dpCtx = ctx.getDPCtx(); List<FileStatus> statusList; - Map<FileStatus, List<FileStatus>> manifestDirToFile = new HashMap<>(); + Map<FileStatus, List<FileStatus>> parentDirToFile = new HashMap<>(); + boolean useCustomStorageHandler = storageHandler != null && storageHandler.supportsMergeFiles(); + MergeTaskProperties mergeProperties = useCustomStorageHandler ? + storageHandler.getMergeTaskProperties(ctx.getTaskProperties()) : null; if (manifestFilePresent) { // Get the list of files from manifest file. List<FileStatus> fileStatuses = getManifestFilePaths(conf, dirPath); // Setup the work to include all the files present in the manifest. setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false); - manifestDirToFile = getManifestDirs(inpFs, fileStatuses); - statusList = new ArrayList<>(manifestDirToFile.keySet()); + parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses); + statusList = Lists.newArrayList(parentDirToFile.keySet()); + } else if (useCustomStorageHandler) { + List<FileStatus> fileStatuses = storageHandler.getMergeTaskInputFiles(ctx.getTaskProperties()); + setupWorkWithCustomHandler(work, dirPath, mergeProperties); + parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses); + statusList = Lists.newArrayList(parentDirToFile.keySet()); } else { statusList = HiveStatsUtils.getFileStatusRecurse(dirPath, dpLbLevel, inpFs); } @@ -295,8 +335,8 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, List<Path> toMerge = new ArrayList<>(); for (int i = 0; i < status.length; ++i) { long len; - if (manifestFilePresent) { - len = getMergeSize(manifestDirToFile.get(status[i]), avgConditionSize); + if (manifestFilePresent || useCustomStorageHandler) { + len = getMergeSize(parentDirToFile.get(status[i]), avgConditionSize); } else { len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize); } @@ -309,12 +349,15 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " + status[i].getPath()); continue; } + if (useCustomStorageHandler) { + updatePartDescProperties(pDesc, mergeProperties); + } Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " + status[i].getPath()); work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, status[i].getPath(), tblDesc, aliases, pDesc); // Do not add input file since its already added when the manifest file is present. - if (manifestFilePresent) { - toMerge.addAll(manifestDirToFile.get(status[i]) + if (manifestFilePresent || useCustomStorageHandler) { + toMerge.addAll(parentDirToFile.get(status[i]) .stream().map(FileStatus::getPath).collect(Collectors.toList())); } else { toMerge.add(status[i].getPath()); @@ -512,7 +555,30 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, mapWork.setUseInputPathsDirectly(true); } - private Map<FileStatus, List<FileStatus>> getManifestDirs(FileSystem inpFs, List<FileStatus> fileStatuses) + private void setupWorkWithCustomHandler(MapWork mapWork, Path dirPath, + MergeTaskProperties mergeProperties) throws IOException, ClassNotFoundException { + Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + Map<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + Operator<? extends OperatorDesc> op = aliasToWork.get(dirPath.toString()); + PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath); + Path tmpDir = mergeProperties.getTmpLocation(); + if (op != null) { + aliasToWork.remove(dirPath.toString()); + aliasToWork.put(tmpDir.toString(), op); + mapWork.setAliasToWork(aliasToWork); + } + if (partitionDesc != null) { + updatePartDescProperties(partitionDesc, mergeProperties); + pathToPartitionInfo.remove(dirPath); + pathToPartitionInfo.put(tmpDir, partitionDesc); + mapWork.setPathToPartitionInfo(pathToPartitionInfo); + } + mapWork.removePathToAlias(dirPath); + mapWork.addPathToAlias(tmpDir, tmpDir.toString()); + mapWork.setUseInputPathsDirectly(true); + } + + private Map<FileStatus, List<FileStatus>> getParentDirToFileMap(FileSystem inpFs, List<FileStatus> fileStatuses) throws IOException { Map<FileStatus, List<FileStatus>> manifestDirsToPaths = new HashMap<>(); for (FileStatus fileStatus : fileStatuses) { @@ -527,4 +593,22 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } return manifestDirsToPaths; } + + private void updatePartDescProperties(PartitionDesc partitionDesc, + MergeTaskProperties mergeProperties) throws IOException, ClassNotFoundException { + if (mergeProperties != null) { + String inputFileFormatClassName = mergeProperties.getStorageFormatDescriptor().getInputFormat(); + String outputFileFormatClassName = mergeProperties.getStorageFormatDescriptor().getOutputFormat(); + String serdeClassName = mergeProperties.getStorageFormatDescriptor().getSerde(); + if (inputFileFormatClassName != null) { + partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName)); + } + if (outputFileFormatClassName != null) { + partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName)); + } + if (serdeClassName != null) { + partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB, serdeClassName); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java new file mode 100644 index 00000000000..6083b1b117f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; + +import java.io.IOException; + +public interface MergeTaskProperties { + public Path getTmpLocation(); + + public StorageFormatDescriptor getStorageFormatDescriptor() throws IOException; +}