This is an automated email from the ASF dual-hosted git repository.

sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d592f4f98 HIVE-28069: Iceberg: Implement Merge task functionality 
for Iceberg tables (#5076) (Sourabh Badhya reviewed by Denys Kuzmenko, 
Krisztian Kasa)
c1d592f4f98 is described below

commit c1d592f4f98e49614ffaed57813983b579b9147f
Author: Sourabh Badhya <iamsbad...@gmail.com>
AuthorDate: Thu Mar 21 15:48:05 2024 +0530

    HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables 
(#5076) (Sourabh Badhya reviewed by Denys Kuzmenko, Krisztian Kasa)
---
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |   2 +-
 .../mr/hive/HiveIcebergOutputCommitter.java        |  66 +++
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  27 ++
 .../mr/hive/IcebergMergeTaskProperties.java        |  55 +++
 .../test/queries/positive/iceberg_merge_files.q    |  95 +++++
 .../positive/llap/iceberg_merge_files.q.out        | 458 +++++++++++++++++++++
 .../test/resources/testconfiguration.properties    |   2 +
 .../hadoop/hive/ql/io/CombineHiveInputFormat.java  |  13 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  17 +
 .../hadoop/hive/ql/optimizer/GenMapRedUtils.java   |  41 ++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  13 +-
 .../ql/plan/ConditionalResolverMergeFiles.java     | 132 ++++--
 .../hadoop/hive/ql/plan/MergeTaskProperties.java   |  30 ++
 13 files changed, 921 insertions(+), 30 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index dd329c122aa..1ea78eeba54 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -191,7 +191,7 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-    return true;
+    return false;
   }
 
   @Override
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index b4d5ce98f59..d9f3116ff84 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.Context.RewritePolicy;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -167,6 +170,8 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
               LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
             }
           }, IOException.class);
+
+      cleanMergeTaskInputFiles(jobConf, tableExecutor, context);
     } finally {
       if (tableExecutor != null) {
         tableExecutor.shutdown();
@@ -741,4 +746,65 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
       throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
     }
   }
+
+  public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws 
IOException {
+    List<OutputTable> outputs = collectOutputs(jobContexts);
+    ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+    ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+    Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+    try {
+      Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+                      .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
+              .suppressFailureWhenFinished()
+              .executeWith(tableExecutor)
+              .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+              .run(output -> {
+                JobContext jobContext = output.getValue();
+                JobConf jobConf = jobContext.getJobConf();
+                LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+                Table table = output.getKey();
+                FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+                String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+                // list jobLocation to get number of forCommit files
+                // we do this because map/reduce num in jobConf is unreliable
+                // and we have no access to vertex status info
+                int numTasks = listForCommits(jobConf, jobLocation).size();
+                FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+                        table.io(), false);
+                for (DataFile dataFile : results.dataFiles()) {
+                  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+                  dataFiles.add(fileStatus);
+                }
+              }, IOException.class);
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
+    }
+    return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,
+                                        ExecutorService tableExecutor,
+                                        TaskAttemptContext context) throws 
IOException {
+    // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+    // At this stage the file is written and task-committed, but the old files 
are still present.
+    if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
+      MapWork mrwork = Utilities.getMapWork(jobConf);
+      if (mrwork != null) {
+        List<Path> mergedPaths = mrwork.getInputPaths();
+        if (mergedPaths != null) {
+          Tasks.foreach(mergedPaths)
+                  .retry(3)
+                  .executeWith(tableExecutor)
+                  .run(path -> {
+                    FileSystem fs = path.getFileSystem(context.getJobConf());
+                    fs.delete(path, true);
+                  }, IOException.class);
+        }
+      }
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7fcaebbfcad..147107638ac 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -45,6 +45,7 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -107,6 +108,7 @@ import 
org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -2029,4 +2031,29 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
     }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+    return true;
+  }
+
+  @Override
+  public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws 
IOException {
+    String tableName = properties.getProperty(Catalogs.NAME);
+    String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+    Configuration configuration = SessionState.getSessionConf();
+    List<JobContext> originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+    List<JobContext> jobContextList = originalContextList.stream()
+            .map(TezUtil::enrichContextWithVertexId)
+            .collect(Collectors.toList());
+    if (jobContextList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return new HiveIcebergOutputCommitter().getOutputFiles(jobContextList);
+  }
+
+  @Override
+  public MergeTaskProperties getMergeTaskProperties(Properties properties) {
+    return new IcebergMergeTaskProperties(properties);
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
new file mode 100644
index 00000000000..ff47a801a5b
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergMergeTaskProperties implements MergeTaskProperties {
+
+  private final Properties properties;
+  private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();
+
+  IcebergMergeTaskProperties(Properties properties) {
+    this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+    String location = properties.getProperty(Catalogs.LOCATION);
+    return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
+    FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+    StorageFormatDescriptor descriptor = 
storageFormatFactory.get(fileFormat.name());
+    if (descriptor == null) {
+      throw new IOException("Unsupported storage format descriptor");
+    }
+    return descriptor;
+  }
+
+}
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q
new file mode 100644
index 00000000000..5d5cd7aa6d8
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q
@@ -0,0 +1,95 @@
+--! qt:dataset:src
+set hive.merge.mapredfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.tezfiles=true;
+set hive.optimize.sort.dynamic.partition.threshold=-1;
+set mapred.reduce.tasks=5;
+set hive.blobstore.supported.schemes=hdfs,file;
+
+-- SORT_QUERY_RESULTS
+create table orc_part_source(key string, value string, ds string) partitioned 
by spec (ds) stored by iceberg stored as orc;
+create table orc_source(key string) stored by iceberg stored as orc;
+
+-- The partitioned table must have 2 files per partition (necessary for merge 
task)
+insert overwrite table orc_part_source partition(ds='102') select * from src;
+insert into table orc_part_source partition(ds='102') select * from src;
+insert overwrite table orc_part_source partition(ds='103') select * from src;
+insert into table orc_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table orc_source select key from src;
+insert into table orc_source select key from src;
+
+select count(*) from orc_source;
+select count(*) from orc_part_source;
+
+select count(distinct(file_path)) from default.orc_source.files;
+select count(distinct(file_path)) from default.orc_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for ORC 
formats.
+insert into table orc_source select * from orc_source;
+insert into table orc_part_source select * from orc_part_source where ds = 102 
union all select * from orc_part_source where ds = 103;
+
+select count(*) from orc_source;
+select count(*) from orc_part_source;
+
+select count(distinct(file_path)) from default.orc_source.files;
+select count(distinct(file_path)) from default.orc_part_source.files;
+
+create table parquet_part_source(key string, value string, ds string) 
partitioned by spec (ds) stored by iceberg stored as parquet;
+create table parquet_source(key string) stored by iceberg stored as parquet;
+
+-- The partitioned table must have 2 files per partition (necessary for merge 
task)
+insert overwrite table parquet_part_source partition(ds='102') select * from 
src;
+insert into table parquet_part_source partition(ds='102') select * from src;
+insert overwrite table parquet_part_source partition(ds='103') select * from 
src;
+insert into table parquet_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table parquet_source select key from src;
+insert into table parquet_source select key from src;
+
+select count(*) from parquet_source;
+select count(*) from parquet_part_source;
+
+select count(distinct(file_path)) from default.parquet_source.files;
+select count(distinct(file_path)) from default.parquet_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for 
Parquet formats.
+insert into table parquet_source select * from parquet_source;
+insert into table parquet_part_source select * from parquet_part_source where 
ds = 102 union all select * from orc_part_source where ds = 103;
+
+select count(*) from parquet_source;
+select count(*) from parquet_part_source;
+
+select count(distinct(file_path)) from default.parquet_source.files;
+select count(distinct(file_path)) from default.parquet_part_source.files;
+
+create table avro_part_source(key string, value string, ds string) partitioned 
by spec (ds) stored by iceberg stored as avro;
+create table avro_source(key string) stored by iceberg stored as avro;
+
+-- The partitioned table must have 2 files per partition (necessary for merge 
task)
+insert overwrite table avro_part_source partition(ds='102') select * from src;
+insert into table avro_part_source partition(ds='102') select * from src;
+insert overwrite table avro_part_source partition(ds='103') select * from src;
+insert into table avro_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table avro_source select key from src;
+insert into table avro_source select key from src;
+
+select count(*) from avro_source;
+select count(*) from avro_part_source;
+
+select count(distinct(file_path)) from default.avro_source.files;
+select count(distinct(file_path)) from default.avro_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for 
Avro formats.
+insert into table avro_source select * from avro_source;
+insert into table avro_part_source select * from avro_part_source where ds = 
102 union all select * from avro_part_source where ds = 103;
+
+select count(*) from avro_source;
+select count(*) from avro_part_source;
+
+select count(distinct(file_path)) from default.avro_source.files;
+select count(distinct(file_path)) from default.avro_part_source.files;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
new file mode 100644
index 00000000000..a17c1130efc
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
@@ -0,0 +1,458 @@
+PREHOOK: query: create table orc_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orc_part_source
+POSTHOOK: query: create table orc_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_part_source
+PREHOOK: query: create table orc_source(key string) stored by iceberg stored 
as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: create table orc_source(key string) stored by iceberg stored 
as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert overwrite table orc_part_source partition(ds='102') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=102
+POSTHOOK: query: insert overwrite table orc_part_source partition(ds='102') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=102
+PREHOOK: query: insert into table orc_part_source partition(ds='102') select * 
from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=102
+POSTHOOK: query: insert into table orc_part_source partition(ds='102') select 
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=102
+PREHOOK: query: insert overwrite table orc_part_source partition(ds='103') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=103
+POSTHOOK: query: insert overwrite table orc_part_source partition(ds='103') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=103
+PREHOOK: query: insert into table orc_part_source partition(ds='103') select * 
from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=103
+POSTHOOK: query: insert into table orc_part_source partition(ds='103') select 
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=103
+PREHOOK: query: insert overwrite table orc_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert overwrite table orc_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert into table orc_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert into table orc_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: select count(*) from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from orc_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.orc_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from 
default.orc_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.orc_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table orc_source select * from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert into table orc_source select * from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert into table orc_part_source select * from 
orc_part_source where ds = 102 union all select * from orc_part_source where ds 
= 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+PREHOOK: Output: default@orc_part_source
+POSTHOOK: query: insert into table orc_part_source select * from 
orc_part_source where ds = 102 union all select * from orc_part_source where ds 
= 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+POSTHOOK: Output: default@orc_part_source
+PREHOOK: query: select count(*) from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from orc_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+4000
+PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.orc_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from 
default.orc_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.orc_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+6
+PREHOOK: query: create table parquet_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as parquet
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_part_source
+POSTHOOK: query: create table parquet_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as parquet
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_part_source
+PREHOOK: query: create table parquet_source(key string) stored by iceberg 
stored as parquet
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: create table parquet_source(key string) stored by iceberg 
stored as parquet
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert overwrite table parquet_part_source partition(ds='102') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=102
+POSTHOOK: query: insert overwrite table parquet_part_source 
partition(ds='102') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=102
+PREHOOK: query: insert into table parquet_part_source partition(ds='102') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=102
+POSTHOOK: query: insert into table parquet_part_source partition(ds='102') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=102
+PREHOOK: query: insert overwrite table parquet_part_source partition(ds='103') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=103
+POSTHOOK: query: insert overwrite table parquet_part_source 
partition(ds='103') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=103
+PREHOOK: query: insert into table parquet_part_source partition(ds='103') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=103
+POSTHOOK: query: insert into table parquet_part_source partition(ds='103') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=103
+PREHOOK: query: insert overwrite table parquet_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert overwrite table parquet_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert into table parquet_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert into table parquet_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: select count(*) from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from parquet_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from 
default.parquet_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.parquet_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from 
default.parquet_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.parquet_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table parquet_source select * from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert into table parquet_source select * from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert into table parquet_part_source select * from 
parquet_part_source where ds = 102 union all select * from orc_part_source 
where ds = 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+PREHOOK: Input: default@parquet_part_source
+PREHOOK: Output: default@parquet_part_source
+POSTHOOK: query: insert into table parquet_part_source select * from 
parquet_part_source where ds = 102 union all select * from orc_part_source 
where ds = 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+POSTHOOK: Input: default@parquet_part_source
+POSTHOOK: Output: default@parquet_part_source
+PREHOOK: query: select count(*) from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from parquet_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+5000
+PREHOOK: query: select count(distinct(file_path)) from 
default.parquet_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.parquet_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from 
default.parquet_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.parquet_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+6
+PREHOOK: query: create table avro_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@avro_part_source
+POSTHOOK: query: create table avro_part_source(key string, value string, ds 
string) partitioned by spec (ds) stored by iceberg stored as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@avro_part_source
+PREHOOK: query: create table avro_source(key string) stored by iceberg stored 
as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: create table avro_source(key string) stored by iceberg stored 
as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert overwrite table avro_part_source partition(ds='102') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=102
+POSTHOOK: query: insert overwrite table avro_part_source partition(ds='102') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=102
+PREHOOK: query: insert into table avro_part_source partition(ds='102') select 
* from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=102
+POSTHOOK: query: insert into table avro_part_source partition(ds='102') select 
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=102
+PREHOOK: query: insert overwrite table avro_part_source partition(ds='103') 
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=103
+POSTHOOK: query: insert overwrite table avro_part_source partition(ds='103') 
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=103
+PREHOOK: query: insert into table avro_part_source partition(ds='103') select 
* from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=103
+POSTHOOK: query: insert into table avro_part_source partition(ds='103') select 
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=103
+PREHOOK: query: insert overwrite table avro_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert overwrite table avro_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert into table avro_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert into table avro_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: select count(*) from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from avro_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from 
default.avro_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.avro_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from 
default.avro_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.avro_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table avro_source select * from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert into table avro_source select * from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert into table avro_part_source select * from 
avro_part_source where ds = 102 union all select * from avro_part_source where 
ds = 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+PREHOOK: Output: default@avro_part_source
+POSTHOOK: query: insert into table avro_part_source select * from 
avro_part_source where ds = 102 union all select * from avro_part_source where 
ds = 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+POSTHOOK: Output: default@avro_part_source
+PREHOOK: query: select count(*) from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from avro_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+4000
+PREHOOK: query: select count(distinct(file_path)) from 
default.avro_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.avro_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from 
default.avro_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from 
default.avro_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+6
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 290f4e3acad..f682c6ef7d3 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -410,6 +410,7 @@ erasurecoding.only.query.files=\
   erasure_simple.q
 
 iceberg.llap.query.files=\
+  iceberg_merge_files.q,\
   llap_iceberg_read_orc.q,\
   llap_iceberg_read_parquet.q,\
   vectorized_iceberg_read_mixed.q,\
@@ -425,6 +426,7 @@ iceberg.llap.query.compactor.files=\
   iceberg_optimize_table_unpartitioned.q
 
 iceberg.llap.only.query.files=\
+  iceberg_merge_files.q,\
   llap_iceberg_read_orc.q,\
   llap_iceberg_read_parquet.q
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index bd5284254f8..c1afa681ade 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -33,9 +33,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +56,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -65,6 +64,8 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.CombineFileSplit;
 
+import static jodd.util.ClassUtil.isAssignableFrom;
+
 
 /**
  * CombineHiveInputFormat is a parameterized InputFormat which looks at the 
path
@@ -370,8 +371,12 @@ public class CombineHiveInputFormat<K extends 
WritableComparable, V extends Writ
       PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
           pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();
-      if ((tableDesc != null) && tableDesc.isNonNative()) {
-        return super.getSplits(job, numSplits);
+      if (tableDesc != null) {
+        boolean useDefaultFileFormat = part.getInputFileFormatClass()
+                .isAssignableFrom(tableDesc.getInputFileFormatClass());
+        if (tableDesc.isNonNative() && useDefaultFileFormat) {
+          return super.getSplits(job, numSplits);
+        }
       }
 
       // Use HiveInputFormat if any of the paths is not splittable
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index e9a0d139e90..a2c476c9ad5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.metadata;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.common.type.SnapshotContext;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.UpdateSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -747,4 +750,18 @@ public interface HiveStorageHandler extends Configurable {
     throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
             "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+    return false;
+  }
+
+  default List<FileStatus> getMergeTaskInputFiles(Properties properties) 
throws IOException {
+    throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+            "for a table.");
+  }
+
+  default MergeTaskProperties getMergeTaskProperties(Properties properties) {
+    throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+            "for a table.");
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 525ecfbb13e..1eb256ceb86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -41,8 +41,11 @@ import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
+import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DemuxOperator;
@@ -1720,6 +1723,41 @@ public final class GenMapRedUtils {
     return newWork;
   }
 
+  private static void 
setStorageHandlerAndProperties(ConditionalResolverMergeFilesCtx mrCtx, MoveWork 
work) {
+    Properties mergeTaskProperties = null;
+    String storageHandlerClass = null;
+    if (work.getLoadTableWork() != null) {
+      // Get the info from the table data
+      TableDesc tableDesc = work.getLoadTableWork().getTable();
+      storageHandlerClass = tableDesc.getProperties().getProperty(
+              
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+      mergeTaskProperties = new Properties(tableDesc.getProperties());
+    } else {
+      // Get the info from the create table data
+      CreateTableDesc createTableDesc = 
work.getLoadFileWork().getCtasCreateTableDesc();
+      String location = null;
+      if (createTableDesc != null) {
+        storageHandlerClass = createTableDesc.getStorageHandler();
+        mergeTaskProperties = new Properties();
+        mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createTableDesc.getDbTableName());
+        location = createTableDesc.getLocation();
+      } else {
+        CreateMaterializedViewDesc createViewDesc = 
work.getLoadFileWork().getCreateViewDesc();
+        if (createViewDesc != null) {
+          storageHandlerClass = createViewDesc.getStorageHandler();
+          mergeTaskProperties = new Properties();
+          mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createViewDesc.getViewName());
+          location = createViewDesc.getLocation();
+        }
+      }
+      if (location != null) {
+        mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_LOCATION, 
location);
+      }
+    }
+    mrCtx.setTaskProperties(mergeTaskProperties);
+    mrCtx.setStorageHandlerClass(storageHandlerClass);
+  }
+
   /**
    * Construct a conditional task given the current leaf task, the MoveWork 
and the MapredWork.
    *
@@ -1800,6 +1838,9 @@ public final class GenMapRedUtils {
     cndTsk.setResolver(new ConditionalResolverMergeFiles());
     ConditionalResolverMergeFilesCtx mrCtx =
         new ConditionalResolverMergeFilesCtx(listTasks, 
condInputPath.toString());
+    if (moveTaskToLink != null) {
+      setStorageHandlerAndProperties(mrCtx, moveTaskToLink.getWork());
+    }
     cndTsk.setResolverCtx(mrCtx);
 
     // make the conditional task as the child of the current leaf task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b15495a67dc..dd3f75fa665 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8159,8 +8159,19 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     boolean canBeMerged = (destinationTable == null || 
!((destinationTable.getNumBuckets() > 0) ||
         (destinationTable.getSortCols() != null && 
destinationTable.getSortCols().size() > 0)));
 
-    // If this table is working with ACID semantics, turn off merging
+    // If this table is working with ACID semantics or
+    // if its a delete, update, merge operation that supports merge task, turn 
off merging
     canBeMerged &= !destTableIsFullAcid;
+    if (destinationTable != null && destinationTable.getStorageHandler() != 
null) {
+      canBeMerged &= destinationTable.getStorageHandler().supportsMergeFiles();
+      // TODO: Support for merge task for update, delete and merge queries
+      //  when storage handler supports it.
+      if (Context.Operation.UPDATE.equals(ctx.getOperation())
+              || Context.Operation.DELETE.equals(ctx.getOperation())
+              || Context.Operation.MERGE.equals(ctx.getOperation())) {
+        canBeMerged = false;
+      }
+    }
 
     // Generate the partition columns from the parent input
     if (destType == QBMetaData.DEST_TABLE || destType == 
QBMetaData.DEST_PARTITION) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index c5aecaa9cae..05e54798840 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Lists;
@@ -37,12 +38,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +75,8 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     private String dir;
     private DynamicPartitionCtx dpCtx; // merge task could be after dynamic 
partition insert
     private ListBucketingCtx lbCtx;
+    private Properties properties;
+    private String storageHandlerClass;
 
     public ConditionalResolverMergeFilesCtx() {
     }
@@ -125,6 +133,22 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     public void setLbCtx(ListBucketingCtx lbCtx) {
       this.lbCtx = lbCtx;
     }
+
+    public void setTaskProperties(Properties properties) {
+      this.properties = properties;
+    }
+
+    public Properties getTaskProperties() {
+      return properties;
+    }
+
+    public void setStorageHandlerClass(String className) {
+      this.storageHandlerClass = className;
+    }
+
+    public String getStorageHandlerClass() {
+      return storageHandlerClass;
+    }
   }
 
   public List<Task<?>> getTasks(HiveConf conf, Object objCtx) {
@@ -147,18 +171,21 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
       Path dirPath = new Path(dirName);
       FileSystem inpFs = dirPath.getFileSystem(conf);
       DynamicPartitionCtx dpCtx = ctx.getDPCtx();
+      HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, 
ctx.getStorageHandlerClass());
+      boolean dirExists = inpFs.exists(dirPath);
+      boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+
+      MapWork work = null;
+      // For each dynamic partition, check if it needs to be merged.
+      if (mrTask.getWork() instanceof MapredWork) {
+        work = ((MapredWork) mrTask.getWork()).getMapWork();
+      } else if (mrTask.getWork() instanceof TezWork){
+        work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0);
+      } else {
+        work = (MapWork) mrTask.getWork();
+      }
 
-      if (inpFs.exists(dirPath)) {
-        // For each dynamic partition, check if it needs to be merged.
-        MapWork work;
-        if (mrTask.getWork() instanceof MapredWork) {
-          work = ((MapredWork) mrTask.getWork()).getMapWork();
-        } else if (mrTask.getWork() instanceof TezWork){
-          work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0);
-        } else {
-          work = (MapWork) mrTask.getWork();
-        }
-
+      if (dirExists) {
         int lbLevel = (ctx.getLbCtx() == null) ? 0 : 
ctx.getLbCtx().calculateListBucketingLevel();
         boolean manifestFilePresent = false;
         FileSystem manifestFs = dirPath.getFileSystem(conf);
@@ -182,11 +209,11 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           int dpLbLevel = numDPCols + lbLevel;
 
           generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, 
mvTask, mrTask,
-              mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel, 
manifestFilePresent);
+              mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel, 
manifestFilePresent, storageHandler);
         } else { // no dynamic partitions
           if(lbLevel == 0) {
             // static partition without list bucketing
-            List<FileStatus> manifestFilePaths = new ArrayList<>();
+            List<FileStatus> manifestFilePaths = Lists.newArrayList();
             long totalSize;
             if (manifestFilePresent) {
               manifestFilePaths = getManifestFilePaths(conf, dirPath);
@@ -208,15 +235,20 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           } else {
             // static partition and list bucketing
             generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, 
mvTask, mrTask,
-                mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent);
+                mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent, storageHandler);
           }
         }
+      } else if (useCustomStorageHandler) {
+        generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, 
mrTask,
+                mrAndMvTask, dirPath, inpFs, ctx, work, 0, false, 
storageHandler);
       } else {
         Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath);
         resTsks.add(mvTask);
       }
     } catch (IOException e) {
       LOG.warn("Exception while getting tasks", e);
+    } catch (ClassNotFoundException | HiveException e) {
+      throw new RuntimeException("Failed to load storage handler: {}" + 
e.getMessage());
     }
 
     // Only one of the tasks should ever be added to resTsks
@@ -254,18 +286,26 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
       long trgtSize, long avgConditionSize, Task<?> mvTask,
       Task<?> mrTask, Task<?> mrAndMvTask, Path dirPath,
       FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel,
-      boolean manifestFilePresent)
-      throws IOException {
+      boolean manifestFilePresent, HiveStorageHandler storageHandler)
+      throws IOException, ClassNotFoundException {
     DynamicPartitionCtx dpCtx = ctx.getDPCtx();
     List<FileStatus> statusList;
-    Map<FileStatus, List<FileStatus>> manifestDirToFile = new HashMap<>();
+    Map<FileStatus, List<FileStatus>> parentDirToFile = new HashMap<>();
+    boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+    MergeTaskProperties mergeProperties = useCustomStorageHandler ?
+            storageHandler.getMergeTaskProperties(ctx.getTaskProperties()) : 
null;
     if (manifestFilePresent) {
       // Get the list of files from manifest file.
       List<FileStatus> fileStatuses = getManifestFilePaths(conf, dirPath);
       // Setup the work to include all the files present in the manifest.
       setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
-      manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
-      statusList = new ArrayList<>(manifestDirToFile.keySet());
+      parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+      statusList = Lists.newArrayList(parentDirToFile.keySet());
+    } else if (useCustomStorageHandler) {
+      List<FileStatus> fileStatuses = 
storageHandler.getMergeTaskInputFiles(ctx.getTaskProperties());
+      setupWorkWithCustomHandler(work, dirPath, mergeProperties);
+      parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+      statusList = Lists.newArrayList(parentDirToFile.keySet());
     } else {
       statusList = HiveStatsUtils.getFileStatusRecurse(dirPath, dpLbLevel, 
inpFs);
     }
@@ -295,8 +335,8 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     List<Path> toMerge = new ArrayList<>();
     for (int i = 0; i < status.length; ++i) {
       long len;
-      if (manifestFilePresent) {
-        len = getMergeSize(manifestDirToFile.get(status[i]), avgConditionSize);
+      if (manifestFilePresent || useCustomStorageHandler) {
+        len = getMergeSize(parentDirToFile.get(status[i]), avgConditionSize);
       } else {
         len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize);
       }
@@ -309,12 +349,15 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " + 
status[i].getPath());
           continue;
         }
+        if (useCustomStorageHandler) {
+          updatePartDescProperties(pDesc, mergeProperties);
+        }
         Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " + 
status[i].getPath());
         work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, 
status[i].getPath(), tblDesc,
             aliases, pDesc);
         // Do not add input file since its already added when the manifest 
file is present.
-        if (manifestFilePresent) {
-          toMerge.addAll(manifestDirToFile.get(status[i])
+        if (manifestFilePresent || useCustomStorageHandler) {
+          toMerge.addAll(parentDirToFile.get(status[i])
               .stream().map(FileStatus::getPath).collect(Collectors.toList()));
         } else {
           toMerge.add(status[i].getPath());
@@ -512,7 +555,30 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map<FileStatus, List<FileStatus>> getManifestDirs(FileSystem inpFs, 
List<FileStatus> fileStatuses)
+  private void setupWorkWithCustomHandler(MapWork mapWork, Path dirPath,
+                                          MergeTaskProperties mergeProperties) 
throws IOException, ClassNotFoundException {
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork = 
mapWork.getAliasToWork();
+    Map<Path, PartitionDesc> pathToPartitionInfo = 
mapWork.getPathToPartitionInfo();
+    Operator<? extends OperatorDesc> op = aliasToWork.get(dirPath.toString());
+    PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath);
+    Path tmpDir = mergeProperties.getTmpLocation();
+    if (op != null) {
+      aliasToWork.remove(dirPath.toString());
+      aliasToWork.put(tmpDir.toString(), op);
+      mapWork.setAliasToWork(aliasToWork);
+    }
+    if (partitionDesc != null) {
+      updatePartDescProperties(partitionDesc, mergeProperties);
+      pathToPartitionInfo.remove(dirPath);
+      pathToPartitionInfo.put(tmpDir, partitionDesc);
+      mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+    }
+    mapWork.removePathToAlias(dirPath);
+    mapWork.addPathToAlias(tmpDir, tmpDir.toString());
+    mapWork.setUseInputPathsDirectly(true);
+  }
+
+  private Map<FileStatus, List<FileStatus>> getParentDirToFileMap(FileSystem 
inpFs, List<FileStatus> fileStatuses)
       throws IOException {
     Map<FileStatus, List<FileStatus>> manifestDirsToPaths = new HashMap<>();
     for (FileStatus fileStatus : fileStatuses) {
@@ -527,4 +593,22 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     }
     return manifestDirsToPaths;
   }
+
+  private void updatePartDescProperties(PartitionDesc partitionDesc,
+                                        MergeTaskProperties mergeProperties) 
throws IOException, ClassNotFoundException {
+    if (mergeProperties != null) {
+      String inputFileFormatClassName = 
mergeProperties.getStorageFormatDescriptor().getInputFormat();
+      String outputFileFormatClassName = 
mergeProperties.getStorageFormatDescriptor().getOutputFormat();
+      String serdeClassName = 
mergeProperties.getStorageFormatDescriptor().getSerde();
+      if (inputFileFormatClassName != null) {
+        
partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName));
+      }
+      if (outputFileFormatClassName != null) {
+        
partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName));
+      }
+      if (serdeClassName != null) {
+        
partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB,
 serdeClassName);
+      }
+    }
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
new file mode 100644
index 00000000000..6083b1b117f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+
+import java.io.IOException;
+
+public interface MergeTaskProperties {
+  public Path getTmpLocation();
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException;
+}

Reply via email to