This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d3acc7a193 HIVE-27589: Iceberg: Branches of Merge/Update statements 
should be committed atomically (Simhadri Govindappa, Denys Kuzmenko, reviewed 
by Krisztian Kasa, Butao Zhang)
3d3acc7a193 is described below

commit 3d3acc7a19399d749a39818573a76a0dbbaf2598
Author: Simhadri Govindappa <simhadri...@gmail.com>
AuthorDate: Mon Aug 21 17:56:03 2023 +0530

    HIVE-27589: Iceberg: Branches of Merge/Update statements should be 
committed atomically (Simhadri Govindappa, Denys Kuzmenko, reviewed by 
Krisztian Kasa, Butao Zhang)
    
    Closes #4575
---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |   1 -
 .../mr/hive/HiveIcebergOutputCommitter.java        | 202 +++++++++--------
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  15 +-
 .../hive/HiveIcebergStorageHandlerTestUtils.java   |   2 +-
 .../apache/iceberg/mr/hive/TestHiveIcebergV2.java  | 119 ++++++++++
 .../org/apache/iceberg/mr/hive/TestHiveShell.java  |  26 ++-
 .../queries/positive/iceberg_atomic_merge_update.q |  99 ++++++++
 .../positive/iceberg_atomic_merge_update.q.out     | 248 +++++++++++++++++++++
 ql/src/java/org/apache/hadoop/hive/ql/Context.java |   9 +-
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |  10 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  11 +-
 11 files changed, 623 insertions(+), 119 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index eb212766c7c..831edd83d0c 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -68,7 +68,6 @@ public class InputFormatConfig {
   public static final String COMMIT_FILE_THREAD_POOL_SIZE = 
"iceberg.mr.commit.file.thread.pool.size";
   public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
   public static final String WRITE_TARGET_FILE_SIZE = 
"iceberg.mr.write.target.file.size";
-  public static final String IS_OVERWRITE = "iceberg.mr.write.is.overwrite";
 
   public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
   public static final boolean CASE_SENSITIVE_DEFAULT = true;
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 1ac8a3225ec..db62dcef1e9 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,15 +36,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
-import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -53,12 +53,13 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expressions;
@@ -71,9 +72,9 @@ import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
 import org.apache.iceberg.mr.hive.writer.WriterRegistry;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -189,7 +190,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
 
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
-    commitJobs(Collections.singletonList(originalContext));
+    commitJobs(Collections.singletonList(originalContext), Operation.OTHER);
   }
 
   /**
@@ -200,16 +201,16 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     private final String catalogName;
     private final String tableName;
     private final Table table;
-    private final JobContext jobContext;
-    private final SessionStateUtil.CommitInfo commitInfo;
+    private List<JobContext> jobContexts;
 
-    private OutputTable(String catalogName, String tableName, Table table, 
JobContext jobContext,
-                        SessionStateUtil.CommitInfo commitInfo) {
+    private OutputTable(String catalogName, String tableName, Table table) {
       this.catalogName = catalogName;
       this.tableName = tableName;
       this.table = table;
-      this.jobContext = jobContext;
-      this.commitInfo = commitInfo;
+    }
+
+    public void setJobContexts(List<JobContext> jobContexts) {
+      this.jobContexts = ImmutableList.copyOf(jobContexts);
     }
 
     @Override
@@ -221,17 +222,12 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
         return false;
       }
       OutputTable output1 = (OutputTable) o;
-      return Objects.equals(tableName, output1.tableName) &&
-          Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID());
+      return Objects.equals(tableName, output1.tableName);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(tableName, jobContext.getJobID());
-    }
-
-    public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
-      return Optional.ofNullable(commitInfo);
+      return Objects.hash(tableName);
     }
   }
 
@@ -241,11 +237,11 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
    * @param originalContextList The job context list
    * @throws IOException if there is a failure accessing the files
    */
-  public void commitJobs(List<JobContext> originalContextList) throws 
IOException {
+  public void commitJobs(List<JobContext> originalContextList, Operation 
operation) throws IOException {
     List<JobContext> jobContextList = originalContextList.stream()
         .map(TezUtil::enrichContextWithVertexId)
         .collect(Collectors.toList());
-    Set<OutputTable> outputs = collectOutputs(jobContextList);
+    List<OutputTable> outputs = collectOutputs(jobContextList);
     long startTime = System.currentTimeMillis();
 
     String ids = jobContextList.stream()
@@ -253,7 +249,6 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     LOG.info("Committing job(s) {} has started", ids);
 
     Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
-
     ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
     ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
     try {
@@ -263,10 +258,12 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           .stopOnFailure()
           .executeWith(tableExecutor)
           .run(output -> {
-            JobConf jobConf = output.jobContext.getJobConf();
             Table table = output.table;
-            jobLocations.add(generateJobLocation(table.location(), jobConf, 
output.jobContext.getJobID()));
-            commitTable(table.io(), fileExecutor, output);
+            jobLocations.addAll(
+                output.jobContexts.stream().map(jobContext ->
+                  generateJobLocation(table.location(), 
jobContext.getJobConf(), jobContext.getJobID()))
+                .collect(Collectors.toList()));
+            commitTable(table.io(), fileExecutor, output, operation);
           });
     } finally {
       fileExecutor.shutdown();
@@ -282,30 +279,27 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     }
   }
 
-  private Set<OutputTable> collectOutputs(List<JobContext> jobContextList) {
-    Set<OutputTable> outputs = Sets.newHashSet();
-    for (JobContext jobContext : jobContextList) {
-      Set<String> outputNames = 
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
-      for (String output : outputNames) {
-        Table table = SessionStateUtil.getResource(jobContext.getJobConf(), 
output)
+  private List<OutputTable> collectOutputs(List<JobContext> jobContextList) {
+    return jobContextList.stream()
+      .flatMap(jobContext -> 
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()).stream()
+        .map(output -> new OutputTable(
+          HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), 
output),
+          output,
+          SessionStateUtil.getResource(jobContext.getJobConf(), output)
             .filter(o -> o instanceof Table).map(o -> (Table) o)
             // fall back to getting the serialized table from the config
-            .orElseGet(() -> 
HiveIcebergStorageHandler.table(jobContext.getJobConf(), output));
-        if (table == null) {
-          LOG.info("CommitJob found no table object in QueryState or conf for: 
{}. Skipping job commit.", output);
-          continue;
-        }
-
-        SessionStateUtil.CommitInfo commitInfo = null;
-        if (SessionStateUtil.getCommitInfo(jobContext.getJobConf(), 
output).isPresent()) {
-          commitInfo = SessionStateUtil.getCommitInfo(jobContext.getJobConf(), 
output)
-              .get().get(jobContext.getJobID().toString());
-        }
-        String catalogName = 
HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), output);
-        outputs.add(new OutputTable(catalogName, output, table, jobContext, 
commitInfo));
-      }
-    }
-    return outputs;
+            .orElseGet(() -> 
HiveIcebergStorageHandler.table(jobContext.getJobConf(), output))))
+        .filter(output -> Objects.nonNull(output.table))
+        .map(output -> new SimpleImmutableEntry<>(output, jobContext)))
+      .collect(
+        Collectors.groupingBy(Map.Entry::getKey,
+          Collectors.mapping(Map.Entry::getValue, Collectors.toList()))
+      ).entrySet().stream().map(
+        kv -> {
+          kv.getKey().setJobContexts(kv.getValue());
+          return kv.getKey();
+        })
+      .collect(Collectors.toList());
   }
 
   /**
@@ -324,7 +318,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     List<JobContext> jobContextList = originalContextList.stream()
         .map(TezUtil::enrichContextWithVertexId)
         .collect(Collectors.toList());
-    Set<OutputTable> outputs = collectOutputs(jobContextList);
+    List<OutputTable> outputs = collectOutputs(jobContextList);
 
     String ids = jobContextList.stream()
         .map(jobContext -> 
jobContext.getJobID().toString()).collect(Collectors.joining(","));
@@ -336,16 +330,17 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
     try {
       // Cleans up the changes for the output tables in parallel
-      Tasks.foreach(outputs)
+      Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+              .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
           .suppressFailureWhenFinished()
           .executeWith(tableExecutor)
           .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on 
abort job", output, exc))
           .run(output -> {
-            JobContext jobContext = output.jobContext;
+            JobContext jobContext = output.getValue();
             JobConf jobConf = jobContext.getJobConf();
             LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
 
-            Table table = output.table;
+            Table table = output.getKey();
             String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
             jobLocations.add(jobLocation);
             // list jobLocation to get number of forCommit files
@@ -354,17 +349,12 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
             FilesForCommit results = collectResults(numTasks, fileExecutor, 
table.location(), jobContext,
                 table.io(), false);
             // Check if we have files already written and remove data and 
delta files if there are any
-            Collection<ContentFile> files = 
Stream.concat(results.dataFiles().stream(), results.deleteFiles().stream())
-                .collect(Collectors.toList());
-
-            if (files.size() > 0) {
-              Tasks.foreach(files)
-                  .retry(3)
-                  .suppressFailureWhenFinished()
-                  .executeWith(fileExecutor)
-                  .onFailure((file, exc) -> LOG.warn("Failed to remove data 
file {} on abort job", file.path(), exc))
-                  .run(file -> table.io().deleteFile(file.path().toString()));
-            }
+            Tasks.foreach(results.allFiles())
+                .retry(3)
+                .suppressFailureWhenFinished()
+                .executeWith(fileExecutor)
+                .onFailure((file, exc) -> LOG.warn("Failed to remove data file 
{} on abort job", file.path(), exc))
+                .run(file -> table.io().deleteFile(file.path().toString()));
           }, IOException.class);
     } finally {
       fileExecutor.shutdown();
@@ -405,48 +395,71 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
    * @param executor The executor used to read the forCommit files
    * @param outputTable The table used for loading from the catalog
    */
-  private void commitTable(FileIO io, ExecutorService executor, OutputTable 
outputTable) {
+  private void commitTable(FileIO io, ExecutorService executor, OutputTable 
outputTable, Operation operation) {
     String name = outputTable.tableName;
-    JobContext jobContext = outputTable.jobContext;
-    JobConf conf = jobContext.getJobConf();
     Properties catalogProperties = new Properties();
     catalogProperties.put(Catalogs.NAME, name);
     catalogProperties.put(Catalogs.LOCATION, outputTable.table.location());
     if (outputTable.catalogName != null) {
       catalogProperties.put(InputFormatConfig.CATALOG_NAME, 
outputTable.catalogName);
     }
-    Table table = Catalogs.loadTable(conf, catalogProperties);
+    List<DataFile> dataFiles = Lists.newArrayList();
+    List<DeleteFile> deleteFiles = Lists.newArrayList();
+
+    Table table = null;
+    String branchName = null;
+
+    for (JobContext jobContext : outputTable.jobContexts) {
+      JobConf conf = jobContext.getJobConf();
+      table = Optional.ofNullable(table).orElse(Catalogs.loadTable(conf, 
catalogProperties));
+      branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
+
+      LOG.info("Committing job has started for table: {}, using location: {}",
+          table, generateJobLocation(outputTable.table.location(), conf, 
jobContext.getJobID()));
+
+      int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> 
info.get(jobContext.getJobID().toString()))
+          .map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> {
+            // Fallback logic, if number of tasks are not available in the 
config
+            // If there are reducers, then every reducer will generate a 
result file.
+            // If this is a map only task, then every mapper will generate a 
result file.
+            LOG.info("Number of tasks not available in session state for 
jobID: {}, table: {}. " +
+                "Falling back to jobConf numReduceTasks/numMapTasks", 
jobContext.getJobID(), name);
+            return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
+          });
 
+      FilesForCommit writeResults = collectResults(
+          numTasks, executor, outputTable.table.location(), jobContext, io, 
true);
+      dataFiles.addAll(writeResults.dataFiles());
+      deleteFiles.addAll(writeResults.deleteFiles());
+    }
+
+    FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles);
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}",
-        table, generateJobLocation(outputTable.table.location(), conf, 
jobContext.getJobID()));
-
-    Optional<SessionStateUtil.CommitInfo> commitInfo = 
outputTable.getCommitInfo();
-    int numTasks = 
commitInfo.map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> {
-      // Fallback logic, if number of tasks are not available in the config
-      // If there are reducers, then every reducer will generate a result file.
-      // If this is a map only task, then every mapper will generate a result 
file.
-      LOG.info("Number of tasks not available in session state for jobID: {}, 
table: {}. Falling back to jobConf " +
-          "numReduceTasks/numMapTasks", jobContext.getJobID(), name);
-      return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
-    });
-
-    FilesForCommit writeResults = collectResults(
-        numTasks, executor, outputTable.table.location(), jobContext, io, 
true);
-    String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
-    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
-      if (writeResults.isEmpty()) {
+
+    if (Operation.IOW != operation) {
+      if (filesForCommit.isEmpty()) {
         LOG.info(
-            "Not creating a new commit for table: {}, jobID: {}, operation: 
{}, since there were no new files to add",
-            table, jobContext.getJobID(), 
HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
+            "Not creating a new commit for table: {}, jobIDs: {}, since there 
were no new files to add",
+            table, outputTable.jobContexts.stream().map(JobContext::getJobID)
+                .map(String::valueOf).collect(Collectors.joining(",")));
       } else {
-        commitWrite(table, branchName, startTime, writeResults);
+        Long snapshotId = getSnapshotId(outputTable.table, branchName);
+        commitWrite(table, branchName, snapshotId, startTime, filesForCommit, 
operation);
       }
     } else {
-      commitOverwrite(table, branchName, startTime, writeResults);
+      commitOverwrite(table, branchName, startTime, filesForCommit);
     }
   }
 
+  private Long getSnapshotId(Table table, String branchName) {
+    Optional<Long> snapshotId = 
Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId);
+    if (StringUtils.isNotEmpty(branchName)) {
+      String ref = HiveUtils.getTableSnapshotRef(branchName);
+      snapshotId = 
Optional.ofNullable(table.refs().get(ref)).map(SnapshotRef::snapshotId);
+    }
+    return snapshotId.orElse(null);
+  }
+
   /**
    * Creates and commits an Iceberg change with the provided data and delete 
files.
    * If there are no delete files then an Iceberg 'append' is created, 
otherwise Iceberg 'overwrite' is created.
@@ -454,14 +467,17 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
    * @param startTime The start time of the commit - used only for logging
    * @param results The object containing the new files we would like to add 
to the table
    */
-  private void commitWrite(Table table, String branchName, long startTime, 
FilesForCommit results) {
-    if (results.deleteFiles().isEmpty()) {
+  private void commitWrite(Table table, String branchName, Long snapshotId, 
long startTime,
+      FilesForCommit results, Operation operation) {
+
+    if (results.deleteFiles().isEmpty() && Operation.MERGE != operation) {
       AppendFiles write = table.newAppend();
       results.dataFiles().forEach(write::appendFile);
       if (StringUtils.isNotEmpty(branchName)) {
         write.toBranch(HiveUtils.getTableSnapshotRef(branchName));
       }
       write.commit();
+
     } else {
       RowDelta write = table.newRowDelta();
       results.dataFiles().forEach(write::addRows);
@@ -469,6 +485,14 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
       if (StringUtils.isNotEmpty(branchName)) {
         write.toBranch(HiveUtils.getTableSnapshotRef(branchName));
       }
+      if (snapshotId != null) {
+        write.validateFromSnapshot(snapshotId);
+      }
+      if (!results.dataFiles().isEmpty()) {
+        write.validateDeletedFiles();
+        write.validateNoConflictingDeleteFiles();
+      }
+      write.validateNoConflictingDataFiles();
       write.commit();
     }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index ef74cf6bf31..19d25b6fbd0 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -563,7 +564,9 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       // Analyze table and stats updater thread
       return fs.delete(statsPath, true);
     }
-    return false;
+    return 
SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
+      .filter(opType -> HiveOperation.ANALYZE_TABLE == opType)
+      .isPresent();
   }
 
   private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) 
throws InvalidObjectException {
@@ -703,7 +706,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   @Override
-  public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
+  public void storageHandlerCommit(Properties commitProperties, Operation 
operation)
+        throws HiveException {
     String tableName = commitProperties.getProperty(Catalogs.NAME);
     String location = commitProperties.getProperty(Catalogs.LOCATION);
     String snapshotRef = commitProperties.getProperty(Catalogs.SNAPSHOT_REF);
@@ -711,14 +715,14 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     if (location != null) {
       HiveTableUtil.cleanupTableObjectFile(location, configuration);
     }
-    List<JobContext> jobContextList = generateJobContext(configuration, 
tableName, snapshotRef, overwrite);
+    List<JobContext> jobContextList = generateJobContext(configuration, 
tableName, snapshotRef);
     if (jobContextList.isEmpty()) {
       return;
     }
 
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     try {
-      committer.commitJobs(jobContextList);
+      committer.commitJobs(jobContextList, operation);
     } catch (Throwable e) {
       String ids = jobContextList
           .stream().map(jobContext -> 
jobContext.getJobID().toString()).collect(Collectors.joining(", "));
@@ -1394,7 +1398,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
    * @return The generated Optional JobContext list or empty if not presents.
    */
   private List<JobContext> generateJobContext(Configuration configuration, 
String tableName,
-      String branchName, boolean overwrite) {
+      String branchName) {
     JobConf jobConf = new JobConf(configuration);
     Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
         SessionStateUtil.getCommitInfo(jobConf, tableName);
@@ -1403,7 +1407,6 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       for (SessionStateUtil.CommitInfo commitInfo : 
commitInfoMap.get().values()) {
         JobID jobID = JobID.forName(commitInfo.getJobIdStr());
         commitInfo.getProps().forEach(jobConf::set);
-        jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
 
         // we should only commit this current table because
         // for multi-table inserts, this hook method will be called 
sequentially for each target table
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
index 572564542e8..2010b7cac08 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
@@ -106,7 +106,7 @@ public class HiveIcebergStorageHandlerTestUtils {
   }
 
   static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder 
temp, String engine) {
-    shell.openSession();
+    shell.getSession();
 
     for (Map.Entry<String, String> property : 
testTables.properties().entrySet()) {
       shell.setHiveSessionValue(property.getKey(), property.getValue());
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index 721e3d012ee..3f474cfe79a 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -21,7 +21,9 @@ package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.stream.StreamSupport;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -33,16 +35,20 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.base.Throwables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Tasks;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import static 
org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerTestUtils.init;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
@@ -646,6 +652,119 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
     shell.executeStatement("select count(*) from customers");
   }
 
+  @Test
+  public void testConcurrent2Deletes() {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    String sql = "DELETE FROM customers WHERE customer_id=3 or 
first_name='Joanna'";
+
+    Tasks.range(2)
+        .executeWith(Executors.newFixedThreadPool(2))
+        .run(i -> {
+          init(shell, testTables, temp, executionEngine);
+          HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+          HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+          shell.executeStatement(sql);
+          shell.closeSession();
+        });
+    List<Object[]> res = shell.executeStatement("SELECT * FROM customers");
+    Assert.assertEquals(4, res.size());
+  }
+
+  @Test
+  public void testConcurrent2Updates() {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    String sql = "UPDATE customers SET last_name='Changed' WHERE customer_id=3 
or first_name='Joanna'";
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            shell.executeStatement(sql);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      Throwable cause = Throwables.getRootCause(ex);
+      Assert.assertTrue(cause instanceof ValidationException);
+      
Assert.assertTrue(cause.getMessage().matches("^Found.*conflicting.*files(.*)"));
+    }
+    List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE 
last_name='Changed'");
+    Assert.assertEquals(5, res.size());
+  }
+
+  @Test
+  public void testConcurrentUpdateAndDelete() {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    String[] sql = new String[]{
+        "DELETE FROM customers WHERE customer_id=3 or first_name='Joanna'",
+        "UPDATE customers SET last_name='Changed' WHERE customer_id=3 or 
first_name='Joanna'"
+    };
+
+    boolean deleteFirst = false;
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            shell.executeStatement(sql[i]);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      Throwable cause = Throwables.getRootCause(ex);
+      Assert.assertTrue(cause instanceof ValidationException);
+      
Assert.assertTrue(cause.getMessage().matches("^Found.*conflicting.*files(.*)"));
+      deleteFirst = cause.getMessage().contains("conflicting delete");
+    }
+    List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE 
last_name='Changed'");
+    Assert.assertEquals(deleteFirst ? 0 : 5, res.size());
+  }
+
+  @Test
+  public void testConcurrent2MergeInserts() {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "source", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1);
+    testTables.createTable(shell, "target", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
+
+    String sql = "MERGE INTO target t USING source s on t.customer_id = 
s.customer_id WHEN Not MATCHED THEN " +
+        "INSERT values (s.customer_id, s.first_name, s.last_name)";
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            shell.executeStatement(sql);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      Throwable cause = Throwables.getRootCause(ex);
+      Assert.assertTrue(cause instanceof ValidationException);
+      Assert.assertTrue(cause.getMessage().startsWith("Found conflicting 
files"));
+    }
+    List<Object[]> res = shell.executeStatement("SELECT * FROM target");
+    Assert.assertEquals(6, res.size());
+  }
+
   private static <T> PositionDelete<T> positionDelete(CharSequence path, long 
pos, T row) {
     PositionDelete<T> positionDelete = PositionDelete.create();
     return positionDelete.set(path, pos, row);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index dbb92305e2d..79e477bbe59 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -54,9 +54,10 @@ public class TestHiveShell {
   private final HiveServer2 hs2;
   private final HiveConf hs2Conf;
   private CLIService client;
-  private HiveSession session;
   private boolean started;
 
+  private ThreadLocal<HiveSession> session = 
ThreadLocal.withInitial(this::openSession);
+
   public TestHiveShell() {
     metastore = new TestHiveMetastore();
     hs2Conf = initializeConf();
@@ -69,9 +70,9 @@ public class TestHiveShell {
   }
 
   public void setHiveSessionValue(String key, String value) {
-    Preconditions.checkState(session != null, "There is no open session for 
setting variables.");
+    Preconditions.checkState(session.get() != null, "There is no open session 
for setting variables.");
     try {
-      session.getSessionConf().set(key, value);
+      session.get().getSessionConf().set(key, value);
     } catch (Exception e) {
       throw new RuntimeException("Unable to set Hive session variable: ", e);
     }
@@ -116,32 +117,33 @@ public class TestHiveShell {
     return metastore;
   }
 
-  public void openSession() {
+  private HiveSession openSession() {
     Preconditions.checkState(started, "You have to start TestHiveShell first, 
before opening a session.");
     try {
       SessionHandle sessionHandle = client.getSessionManager().openSession(
           CLIService.SERVER_VERSION, "", "", "127.0.0.1", 
Collections.emptyMap());
-      session = client.getSessionManager().getSession(sessionHandle);
+      return client.getSessionManager().getSession(sessionHandle);
     } catch (Exception e) {
       throw new RuntimeException("Unable to open new Hive session: ", e);
     }
   }
 
   public void closeSession() {
-    Preconditions.checkState(session != null, "There is no open session to be 
closed.");
+    Preconditions.checkState(session.get() != null, "There is no open session 
to be closed.");
     try {
-      session.close();
-      session = null;
+      session.get().close();
+      session.remove();
     } catch (Exception e) {
       throw new RuntimeException("Unable to close Hive session: ", e);
     }
   }
 
   public List<Object[]> executeStatement(String statement) {
-    Preconditions.checkState(session != null,
+    Preconditions.checkState(session.get() != null,
             "You have to start TestHiveShell and open a session first, before 
running a query.");
     try {
-      OperationHandle handle = 
client.executeStatement(session.getSessionHandle(), statement, 
Collections.emptyMap());
+      OperationHandle handle = 
client.executeStatement(session.get().getSessionHandle(), statement,
+          Collections.emptyMap());
       List<Object[]> resultSet = Lists.newArrayList();
       if (handle.hasResultSet()) {
         RowSet rowSet;
@@ -172,14 +174,14 @@ public class TestHiveShell {
 
   public Configuration getHiveConf() {
     if (session != null) {
-      return session.getHiveConf();
+      return session.get().getHiveConf();
     } else {
       return hs2Conf;
     }
   }
 
   public HiveSession getSession() {
-    return session;
+    return session.get();
   }
 
   private HiveConf initializeConf() {
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q
new file mode 100644
index 00000000000..e0809721a53
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q
@@ -0,0 +1,99 @@
+-- SORT_QUERY_RESULTS
+
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+
+set hive.optimize.shared.work.merge.ts.schema=true;
+set hive.vectorized.execution.enabled=true;
+
+CREATE EXTERNAL TABLE calls (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO calls (s_key, year) VALUES (1090969, 2022);
+
+
+CREATE EXTERNAL TABLE display (
+  skey bigint,
+  hierarchy_number string,
+  hierarchy_name string,
+  language_id int,
+  hierarchy_display string,
+  orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO display (skey, language_id, hierarchy_display) VALUES
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1');
+  
+
+MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub1
+
+  UNION ALL
+
+  SELECT distinct display_skey, null as display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub
+ON display.skey = sub.display_skey
+    and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'));
+
+select s.operation, s.summary['added-records'], s.summary['deleted-records'] 
from default.display.snapshots s
+  order by s.snapshot_id;
+
+
+-- clean up
+DROP TABLE calls;
+DROP TABLE display;
+
+
+-- Update
+
+CREATE EXTERNAL TABLE calls_v2 (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 2023), (3, 2024),  
(4, 2024), (5, 2025), (1, 2022), (2, 2023),
+(3, 2024),  (4, 2024), (5, 2025);
+
+update calls_v2 set s_key=10 where year=2023;
+delete from calls_v2 where year=2024;  
+
+select s.operation, s.summary['added-records'], s.summary['deleted-records'] 
from default.calls_v2.snapshots s
+  order by s.snapshot_id;
+
+-- clean up
+
+DROP TABLE calls_v2
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out
new file mode 100644
index 00000000000..7bc1a8db135
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out
@@ -0,0 +1,248 @@
+PREHOOK: query: CREATE EXTERNAL TABLE calls (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls
+POSTHOOK: query: CREATE EXTERNAL TABLE calls (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls
+PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@calls
+POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@calls
+PREHOOK: query: CREATE EXTERNAL TABLE display (
+  skey bigint,
+  hierarchy_number string,
+  hierarchy_name string,
+  language_id int,
+  hierarchy_display string,
+  orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@display
+POSTHOOK: query: CREATE EXTERNAL TABLE display (
+  skey bigint,
+  hierarchy_number string,
+  hierarchy_name string,
+  language_id int,
+  hierarchy_display string,
+  orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@display
+PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) 
VALUES
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@display
+POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) 
VALUES
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@display
+Warning: Shuffle Join MERGEJOIN[67][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[68][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 8' is a cross product
+PREHOOK: query: MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub1
+
+  UNION ALL
+
+  SELECT distinct display_skey, null as display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub
+ON display.skey = sub.display_skey
+    and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'))
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub1
+
+  UNION ALL
+
+  SELECT distinct display_skey, null as display, display as orig_display
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM (
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R
+    INNER JOIN display D
+      ON R.s_key = D.skey AND D.language_id = 3
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub
+ON display.skey = sub.display_skey
+    and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ]
+PREHOOK: query: select s.operation, s.summary['added-records'], 
s.summary['deleted-records'] from default.display.snapshots s
+  order by s.snapshot_id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@display
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select s.operation, s.summary['added-records'], 
s.summary['deleted-records'] from default.display.snapshots s
+  order by s.snapshot_id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@display
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+append 2       NULL
+overwrite      4       NULL
+PREHOOK: query: DROP TABLE calls
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@calls
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls
+POSTHOOK: query: DROP TABLE calls
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@calls
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls
+PREHOOK: query: DROP TABLE display
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@display
+PREHOOK: Output: database:default
+PREHOOK: Output: default@display
+POSTHOOK: query: DROP TABLE display
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@display
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@display
+PREHOOK: query: CREATE EXTERNAL TABLE calls_v2 (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls_v2
+POSTHOOK: query: CREATE EXTERNAL TABLE calls_v2 (
+  s_key bigint,
+  year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls_v2
+PREHOOK: query: INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 
2023), (3, 2024),  (4, 2024), (5, 2025), (1, 2022), (2, 2023),
+(3, 2024),  (4, 2024), (5, 2025)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@calls_v2
+POSTHOOK: query: INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 
2023), (3, 2024),  (4, 2024), (5, 2025), (1, 2022), (2, 2023),
+(3, 2024),  (4, 2024), (5, 2025)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@calls_v2
+PREHOOK: query: update calls_v2 set s_key=10 where year=2023
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls_v2
+PREHOOK: Output: default@calls_v2
+PREHOOK: Output: default@calls_v2
+POSTHOOK: query: update calls_v2 set s_key=10 where year=2023
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls_v2
+POSTHOOK: Output: default@calls_v2
+POSTHOOK: Output: default@calls_v2
+PREHOOK: query: delete from calls_v2 where year=2024
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls_v2
+PREHOOK: Output: default@calls_v2
+POSTHOOK: query: delete from calls_v2 where year=2024
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls_v2
+POSTHOOK: Output: default@calls_v2
+PREHOOK: query: select s.operation, s.summary['added-records'], 
s.summary['deleted-records'] from default.calls_v2.snapshots s
+  order by s.snapshot_id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls_v2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select s.operation, s.summary['added-records'], 
s.summary['deleted-records'] from default.calls_v2.snapshots s
+  order by s.snapshot_id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls_v2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+append 10      NULL
+overwrite      2       NULL
+overwrite      NULL    NULL
+PREHOOK: query: DROP TABLE calls_v2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@calls_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls_v2
+POSTHOOK: query: DROP TABLE calls_v2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@calls_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls_v2
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 94e8e0edc2b..1b1c6e23b48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.antlr.runtime.TokenRewriteStream;
+import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -124,7 +125,7 @@ public class Context {
   // Some statements, e.g., UPDATE, DELETE, or MERGE, get rewritten into 
different
   // subqueries that create new contexts. We keep them here so we can clean 
them
   // up when we are done.
-  private final Set<Context> subContexts;
+  private final List<Context> subContexts;
 
   private String replPolicy;
 
@@ -231,7 +232,7 @@ public class Context {
    * These ops require special handling in various places
    * (note that Insert into Acid table is in OTHER category)
    */
-  public enum Operation {UPDATE, DELETE, MERGE, OTHER}
+  public enum Operation {UPDATE, DELETE, MERGE, IOW, OTHER}
   public enum DestClausePrefix {
     INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-");
     private final String prefix;
@@ -373,7 +374,7 @@ public class Context {
   private Context(Configuration conf, String executionId) {
     this.conf = conf;
     this.executionId = executionId;
-    this.subContexts = new HashSet<>();
+    this.subContexts = Lists.newArrayList();
 
     // local & non-local tmp location is configurable. however it is the same 
across
     // all external file systems
@@ -433,7 +434,7 @@ public class Context {
     this.statsSource = ctx.statsSource;
     this.executionIndex = ctx.executionIndex;
     this.viewsTokenRewriteStreams = new HashMap<>();
-    this.subContexts = new HashSet<>();
+    this.subContexts = Lists.newArrayList();
     this.opContext = new CompilationOpContext();
     this.enableUnparse = ctx.enableUnparse;
     this.scheduledQuery = ctx.scheduledQuery;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 5d167695f92..9cdf7f05d22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -34,6 +34,8 @@ import 
org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.DDLUtils;
 import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
@@ -1063,7 +1065,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
   private boolean checkAndCommitNatively(MoveWork moveWork, Configuration 
configuration) throws HiveException {
     String storageHandlerClass = null;
     Properties commitProperties = null;
-    boolean overwrite = false;
+    Operation operation = context.getOperation();
     LoadTableDesc loadTableWork = moveWork.getLoadTableWork();
     if (loadTableWork != null) {
       if (loadTableWork.isUseAppendForLoad()) {
@@ -1077,7 +1079,9 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
       storageHandlerClass = tableDesc.getProperties().getProperty(
           
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
       commitProperties = new Properties(tableDesc.getProperties());
-      overwrite = moveWork.getLoadTableWork().isInsertOverwrite();
+      if (moveWork.getLoadTableWork().isInsertOverwrite()) {
+        operation = Operation.IOW;
+      }
     } else if (moveWork.getLoadFileWork() != null) {
       // Get the info from the create table data
       CreateTableDesc createTableDesc = 
moveWork.getLoadFileWork().getCtasCreateTableDesc();
@@ -1105,7 +1109,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
     if (storageHandlerClass != null) {
       HiveStorageHandler storageHandler = 
HiveUtils.getStorageHandler(configuration, storageHandlerClass);
       if (storageHandler.commitInMoveTask()) {
-        storageHandler.storageHandlerCommit(commitProperties, overwrite);
+        storageHandler.storageHandlerCommit(commitProperties, operation);
         return true;
       }
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 16f347a8445..c2e5a2cfa12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -543,17 +543,22 @@ public interface HiveStorageHandler extends Configurable {
   default boolean commitInMoveTask() {
     return false;
   }
-
+  
   /**
    * Commits the inserts for the non-native tables. Used in the {@link 
org.apache.hadoop.hive.ql.exec.MoveTask}.
    * @param commitProperties Commit properties which are needed for the 
handler based commit
-   * @param overwrite If this is an INSERT OVERWRITE then it is true
+   * @param operation the operation type
    * @throws HiveException If there is an error during commit
    */
-  default void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
+  default void storageHandlerCommit(Properties commitProperties, Operation 
operation) throws HiveException {
     throw new UnsupportedOperationException();
   }
 
+  @Deprecated
+  default void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
+    storageHandlerCommit(commitProperties, overwrite ? Operation.IOW : 
Operation.OTHER);
+  }
+
   /**
    * Checks whether a certain ALTER TABLE operation is supported by the 
storage handler implementation.
    *

Reply via email to