This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 3d3acc7a193 HIVE-27589: Iceberg: Branches of Merge/Update statements should be committed atomically (Simhadri Govindappa, Denys Kuzmenko, reviewed by Krisztian Kasa, Butao Zhang) 3d3acc7a193 is described below commit 3d3acc7a19399d749a39818573a76a0dbbaf2598 Author: Simhadri Govindappa <simhadri...@gmail.com> AuthorDate: Mon Aug 21 17:56:03 2023 +0530 HIVE-27589: Iceberg: Branches of Merge/Update statements should be committed atomically (Simhadri Govindappa, Denys Kuzmenko, reviewed by Krisztian Kasa, Butao Zhang) Closes #4575 --- .../org/apache/iceberg/mr/InputFormatConfig.java | 1 - .../mr/hive/HiveIcebergOutputCommitter.java | 202 +++++++++-------- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 15 +- .../hive/HiveIcebergStorageHandlerTestUtils.java | 2 +- .../apache/iceberg/mr/hive/TestHiveIcebergV2.java | 119 ++++++++++ .../org/apache/iceberg/mr/hive/TestHiveShell.java | 26 ++- .../queries/positive/iceberg_atomic_merge_update.q | 99 ++++++++ .../positive/iceberg_atomic_merge_update.q.out | 248 +++++++++++++++++++++ ql/src/java/org/apache/hadoop/hive/ql/Context.java | 9 +- .../org/apache/hadoop/hive/ql/exec/MoveTask.java | 10 +- .../hive/ql/metadata/HiveStorageHandler.java | 11 +- 11 files changed, 623 insertions(+), 119 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index eb212766c7c..831edd83d0c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -68,7 +68,6 @@ public class InputFormatConfig { public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size"; public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10; public static final String WRITE_TARGET_FILE_SIZE = "iceberg.mr.write.target.file.size"; - public static final String IS_OVERWRITE = "iceberg.mr.write.is.overwrite"; public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive"; public static final boolean CASE_SENSITIVE_DEFAULT = true; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 1ac8a3225ec..db62dcef1e9 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -35,15 +36,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; @@ -53,12 +53,13 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expressions; @@ -71,9 +72,9 @@ import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter; import org.apache.iceberg.mr.hive.writer.WriterRegistry; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -189,7 +190,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { @Override public void commitJob(JobContext originalContext) throws IOException { - commitJobs(Collections.singletonList(originalContext)); + commitJobs(Collections.singletonList(originalContext), Operation.OTHER); } /** @@ -200,16 +201,16 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { private final String catalogName; private final String tableName; private final Table table; - private final JobContext jobContext; - private final SessionStateUtil.CommitInfo commitInfo; + private List<JobContext> jobContexts; - private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, - SessionStateUtil.CommitInfo commitInfo) { + private OutputTable(String catalogName, String tableName, Table table) { this.catalogName = catalogName; this.tableName = tableName; this.table = table; - this.jobContext = jobContext; - this.commitInfo = commitInfo; + } + + public void setJobContexts(List<JobContext> jobContexts) { + this.jobContexts = ImmutableList.copyOf(jobContexts); } @Override @@ -221,17 +222,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { return false; } OutputTable output1 = (OutputTable) o; - return Objects.equals(tableName, output1.tableName) && - Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + return Objects.equals(tableName, output1.tableName); } @Override public int hashCode() { - return Objects.hash(tableName, jobContext.getJobID()); - } - - public Optional<SessionStateUtil.CommitInfo> getCommitInfo() { - return Optional.ofNullable(commitInfo); + return Objects.hash(tableName); } } @@ -241,11 +237,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { * @param originalContextList The job context list * @throws IOException if there is a failure accessing the files */ - public void commitJobs(List<JobContext> originalContextList) throws IOException { + public void commitJobs(List<JobContext> originalContextList, Operation operation) throws IOException { List<JobContext> jobContextList = originalContextList.stream() .map(TezUtil::enrichContextWithVertexId) .collect(Collectors.toList()); - Set<OutputTable> outputs = collectOutputs(jobContextList); + List<OutputTable> outputs = collectOutputs(jobContextList); long startTime = System.currentTimeMillis(); String ids = jobContextList.stream() @@ -253,7 +249,6 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { LOG.info("Committing job(s) {} has started", ids); Collection<String> jobLocations = new ConcurrentLinkedQueue<>(); - ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { @@ -263,10 +258,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { .stopOnFailure() .executeWith(tableExecutor) .run(output -> { - JobConf jobConf = output.jobContext.getJobConf(); Table table = output.table; - jobLocations.add(generateJobLocation(table.location(), jobConf, output.jobContext.getJobID())); - commitTable(table.io(), fileExecutor, output); + jobLocations.addAll( + output.jobContexts.stream().map(jobContext -> + generateJobLocation(table.location(), jobContext.getJobConf(), jobContext.getJobID())) + .collect(Collectors.toList())); + commitTable(table.io(), fileExecutor, output, operation); }); } finally { fileExecutor.shutdown(); @@ -282,30 +279,27 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { } } - private Set<OutputTable> collectOutputs(List<JobContext> jobContextList) { - Set<OutputTable> outputs = Sets.newHashSet(); - for (JobContext jobContext : jobContextList) { - Set<String> outputNames = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); - for (String output : outputNames) { - Table table = SessionStateUtil.getResource(jobContext.getJobConf(), output) + private List<OutputTable> collectOutputs(List<JobContext> jobContextList) { + return jobContextList.stream() + .flatMap(jobContext -> HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()).stream() + .map(output -> new OutputTable( + HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), output), + output, + SessionStateUtil.getResource(jobContext.getJobConf(), output) .filter(o -> o instanceof Table).map(o -> (Table) o) // fall back to getting the serialized table from the config - .orElseGet(() -> HiveIcebergStorageHandler.table(jobContext.getJobConf(), output)); - if (table == null) { - LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", output); - continue; - } - - SessionStateUtil.CommitInfo commitInfo = null; - if (SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output).isPresent()) { - commitInfo = SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output) - .get().get(jobContext.getJobID().toString()); - } - String catalogName = HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), output); - outputs.add(new OutputTable(catalogName, output, table, jobContext, commitInfo)); - } - } - return outputs; + .orElseGet(() -> HiveIcebergStorageHandler.table(jobContext.getJobConf(), output)))) + .filter(output -> Objects.nonNull(output.table)) + .map(output -> new SimpleImmutableEntry<>(output, jobContext))) + .collect( + Collectors.groupingBy(Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toList())) + ).entrySet().stream().map( + kv -> { + kv.getKey().setJobContexts(kv.getValue()); + return kv.getKey(); + }) + .collect(Collectors.toList()); } /** @@ -324,7 +318,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { List<JobContext> jobContextList = originalContextList.stream() .map(TezUtil::enrichContextWithVertexId) .collect(Collectors.toList()); - Set<OutputTable> outputs = collectOutputs(jobContextList); + List<OutputTable> outputs = collectOutputs(jobContextList); String ids = jobContextList.stream() .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); @@ -336,16 +330,17 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { // Cleans up the changes for the output tables in parallel - Tasks.foreach(outputs) + Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() + .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { - JobContext jobContext = output.jobContext; + JobContext jobContext = output.getValue(); JobConf jobConf = jobContext.getJobConf(); LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); - Table table = output.table; + Table table = output.getKey(); String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); jobLocations.add(jobLocation); // list jobLocation to get number of forCommit files @@ -354,17 +349,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, table.io(), false); // Check if we have files already written and remove data and delta files if there are any - Collection<ContentFile> files = Stream.concat(results.dataFiles().stream(), results.deleteFiles().stream()) - .collect(Collectors.toList()); - - if (files.size() > 0) { - Tasks.foreach(files) - .retry(3) - .suppressFailureWhenFinished() - .executeWith(fileExecutor) - .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc)) - .run(file -> table.io().deleteFile(file.path().toString())); - } + Tasks.foreach(results.allFiles()) + .retry(3) + .suppressFailureWhenFinished() + .executeWith(fileExecutor) + .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc)) + .run(file -> table.io().deleteFile(file.path().toString())); }, IOException.class); } finally { fileExecutor.shutdown(); @@ -405,48 +395,71 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { * @param executor The executor used to read the forCommit files * @param outputTable The table used for loading from the catalog */ - private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable) { + private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable, Operation operation) { String name = outputTable.tableName; - JobContext jobContext = outputTable.jobContext; - JobConf conf = jobContext.getJobConf(); Properties catalogProperties = new Properties(); catalogProperties.put(Catalogs.NAME, name); catalogProperties.put(Catalogs.LOCATION, outputTable.table.location()); if (outputTable.catalogName != null) { catalogProperties.put(InputFormatConfig.CATALOG_NAME, outputTable.catalogName); } - Table table = Catalogs.loadTable(conf, catalogProperties); + List<DataFile> dataFiles = Lists.newArrayList(); + List<DeleteFile> deleteFiles = Lists.newArrayList(); + + Table table = null; + String branchName = null; + + for (JobContext jobContext : outputTable.jobContexts) { + JobConf conf = jobContext.getJobConf(); + table = Optional.ofNullable(table).orElse(Catalogs.loadTable(conf, catalogProperties)); + branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF); + + LOG.info("Committing job has started for table: {}, using location: {}", + table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID())); + + int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.get(jobContext.getJobID().toString())) + .map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> { + // Fallback logic, if number of tasks are not available in the config + // If there are reducers, then every reducer will generate a result file. + // If this is a map only task, then every mapper will generate a result file. + LOG.info("Number of tasks not available in session state for jobID: {}, table: {}. " + + "Falling back to jobConf numReduceTasks/numMapTasks", jobContext.getJobID(), name); + return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + }); + FilesForCommit writeResults = collectResults( + numTasks, executor, outputTable.table.location(), jobContext, io, true); + dataFiles.addAll(writeResults.dataFiles()); + deleteFiles.addAll(writeResults.deleteFiles()); + } + + FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles); long startTime = System.currentTimeMillis(); - LOG.info("Committing job has started for table: {}, using location: {}", - table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID())); - - Optional<SessionStateUtil.CommitInfo> commitInfo = outputTable.getCommitInfo(); - int numTasks = commitInfo.map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> { - // Fallback logic, if number of tasks are not available in the config - // If there are reducers, then every reducer will generate a result file. - // If this is a map only task, then every mapper will generate a result file. - LOG.info("Number of tasks not available in session state for jobID: {}, table: {}. Falling back to jobConf " + - "numReduceTasks/numMapTasks", jobContext.getJobID(), name); - return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); - }); - - FilesForCommit writeResults = collectResults( - numTasks, executor, outputTable.table.location(), jobContext, io, true); - String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF); - if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) { - if (writeResults.isEmpty()) { + + if (Operation.IOW != operation) { + if (filesForCommit.isEmpty()) { LOG.info( - "Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add", - table, jobContext.getJobID(), HiveCustomStorageHandlerUtils.getWriteOperation(conf, name)); + "Not creating a new commit for table: {}, jobIDs: {}, since there were no new files to add", + table, outputTable.jobContexts.stream().map(JobContext::getJobID) + .map(String::valueOf).collect(Collectors.joining(","))); } else { - commitWrite(table, branchName, startTime, writeResults); + Long snapshotId = getSnapshotId(outputTable.table, branchName); + commitWrite(table, branchName, snapshotId, startTime, filesForCommit, operation); } } else { - commitOverwrite(table, branchName, startTime, writeResults); + commitOverwrite(table, branchName, startTime, filesForCommit); } } + private Long getSnapshotId(Table table, String branchName) { + Optional<Long> snapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); + if (StringUtils.isNotEmpty(branchName)) { + String ref = HiveUtils.getTableSnapshotRef(branchName); + snapshotId = Optional.ofNullable(table.refs().get(ref)).map(SnapshotRef::snapshotId); + } + return snapshotId.orElse(null); + } + /** * Creates and commits an Iceberg change with the provided data and delete files. * If there are no delete files then an Iceberg 'append' is created, otherwise Iceberg 'overwrite' is created. @@ -454,14 +467,17 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { * @param startTime The start time of the commit - used only for logging * @param results The object containing the new files we would like to add to the table */ - private void commitWrite(Table table, String branchName, long startTime, FilesForCommit results) { - if (results.deleteFiles().isEmpty()) { + private void commitWrite(Table table, String branchName, Long snapshotId, long startTime, + FilesForCommit results, Operation operation) { + + if (results.deleteFiles().isEmpty() && Operation.MERGE != operation) { AppendFiles write = table.newAppend(); results.dataFiles().forEach(write::appendFile); if (StringUtils.isNotEmpty(branchName)) { write.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } write.commit(); + } else { RowDelta write = table.newRowDelta(); results.dataFiles().forEach(write::addRows); @@ -469,6 +485,14 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { if (StringUtils.isNotEmpty(branchName)) { write.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } + if (snapshotId != null) { + write.validateFromSnapshot(snapshotId); + } + if (!results.dataFiles().isEmpty()) { + write.validateDeletedFiles(); + write.validateNoConflictingDeleteFiles(); + } + write.validateNoConflictingDataFiles(); write.commit(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index ef74cf6bf31..19d25b6fbd0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -98,6 +98,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.session.SessionState; @@ -563,7 +564,9 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H // Analyze table and stats updater thread return fs.delete(statsPath, true); } - return false; + return SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation) + .filter(opType -> HiveOperation.ANALYZE_TABLE == opType) + .isPresent(); } private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) throws InvalidObjectException { @@ -703,7 +706,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } @Override - public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { + public void storageHandlerCommit(Properties commitProperties, Operation operation) + throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); String location = commitProperties.getProperty(Catalogs.LOCATION); String snapshotRef = commitProperties.getProperty(Catalogs.SNAPSHOT_REF); @@ -711,14 +715,14 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H if (location != null) { HiveTableUtil.cleanupTableObjectFile(location, configuration); } - List<JobContext> jobContextList = generateJobContext(configuration, tableName, snapshotRef, overwrite); + List<JobContext> jobContextList = generateJobContext(configuration, tableName, snapshotRef); if (jobContextList.isEmpty()) { return; } HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); try { - committer.commitJobs(jobContextList); + committer.commitJobs(jobContextList, operation); } catch (Throwable e) { String ids = jobContextList .stream().map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(", ")); @@ -1394,7 +1398,7 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H * @return The generated Optional JobContext list or empty if not presents. */ private List<JobContext> generateJobContext(Configuration configuration, String tableName, - String branchName, boolean overwrite) { + String branchName) { JobConf jobConf = new JobConf(configuration); Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap = SessionStateUtil.getCommitInfo(jobConf, tableName); @@ -1403,7 +1407,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) { JobID jobID = JobID.forName(commitInfo.getJobIdStr()); commitInfo.getProps().forEach(jobConf::set); - jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite); // we should only commit this current table because // for multi-table inserts, this hook method will be called sequentially for each target table diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java index 572564542e8..2010b7cac08 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java @@ -106,7 +106,7 @@ public class HiveIcebergStorageHandlerTestUtils { } static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder temp, String engine) { - shell.openSession(); + shell.getSession(); for (Map.Entry<String, String> property : testTables.properties().entrySet()) { shell.setHiveSessionValue(property.getKey(), property.getValue()); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java index 721e3d012ee..3f474cfe79a 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java @@ -21,7 +21,9 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; import java.util.List; +import java.util.concurrent.Executors; import java.util.stream.StreamSupport; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -33,16 +35,20 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.base.Throwables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import static org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerTestUtils.init; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -646,6 +652,119 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase { shell.executeStatement("select count(*) from customers"); } + @Test + public void testConcurrent2Deletes() { + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2); + String sql = "DELETE FROM customers WHERE customer_id=3 or first_name='Joanna'"; + + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + shell.executeStatement(sql); + shell.closeSession(); + }); + List<Object[]> res = shell.executeStatement("SELECT * FROM customers"); + Assert.assertEquals(4, res.size()); + } + + @Test + public void testConcurrent2Updates() { + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2); + String sql = "UPDATE customers SET last_name='Changed' WHERE customer_id=3 or first_name='Joanna'"; + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + shell.executeStatement(sql); + shell.closeSession(); + }); + } catch (Throwable ex) { + Throwable cause = Throwables.getRootCause(ex); + Assert.assertTrue(cause instanceof ValidationException); + Assert.assertTrue(cause.getMessage().matches("^Found.*conflicting.*files(.*)")); + } + List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE last_name='Changed'"); + Assert.assertEquals(5, res.size()); + } + + @Test + public void testConcurrentUpdateAndDelete() { + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2); + String[] sql = new String[]{ + "DELETE FROM customers WHERE customer_id=3 or first_name='Joanna'", + "UPDATE customers SET last_name='Changed' WHERE customer_id=3 or first_name='Joanna'" + }; + + boolean deleteFirst = false; + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + shell.executeStatement(sql[i]); + shell.closeSession(); + }); + } catch (Throwable ex) { + Throwable cause = Throwables.getRootCause(ex); + Assert.assertTrue(cause instanceof ValidationException); + Assert.assertTrue(cause.getMessage().matches("^Found.*conflicting.*files(.*)")); + deleteFirst = cause.getMessage().contains("conflicting delete"); + } + List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE last_name='Changed'"); + Assert.assertEquals(deleteFirst ? 0 : 5, res.size()); + } + + @Test + public void testConcurrent2MergeInserts() { + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "source", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1); + testTables.createTable(shell, "target", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2); + + String sql = "MERGE INTO target t USING source s on t.customer_id = s.customer_id WHEN Not MATCHED THEN " + + "INSERT values (s.customer_id, s.first_name, s.last_name)"; + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + shell.executeStatement(sql); + shell.closeSession(); + }); + } catch (Throwable ex) { + Throwable cause = Throwables.getRootCause(ex); + Assert.assertTrue(cause instanceof ValidationException); + Assert.assertTrue(cause.getMessage().startsWith("Found conflicting files")); + } + List<Object[]> res = shell.executeStatement("SELECT * FROM target"); + Assert.assertEquals(6, res.size()); + } + private static <T> PositionDelete<T> positionDelete(CharSequence path, long pos, T row) { PositionDelete<T> positionDelete = PositionDelete.create(); return positionDelete.set(path, pos, row); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java index dbb92305e2d..79e477bbe59 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java @@ -54,9 +54,10 @@ public class TestHiveShell { private final HiveServer2 hs2; private final HiveConf hs2Conf; private CLIService client; - private HiveSession session; private boolean started; + private ThreadLocal<HiveSession> session = ThreadLocal.withInitial(this::openSession); + public TestHiveShell() { metastore = new TestHiveMetastore(); hs2Conf = initializeConf(); @@ -69,9 +70,9 @@ public class TestHiveShell { } public void setHiveSessionValue(String key, String value) { - Preconditions.checkState(session != null, "There is no open session for setting variables."); + Preconditions.checkState(session.get() != null, "There is no open session for setting variables."); try { - session.getSessionConf().set(key, value); + session.get().getSessionConf().set(key, value); } catch (Exception e) { throw new RuntimeException("Unable to set Hive session variable: ", e); } @@ -116,32 +117,33 @@ public class TestHiveShell { return metastore; } - public void openSession() { + private HiveSession openSession() { Preconditions.checkState(started, "You have to start TestHiveShell first, before opening a session."); try { SessionHandle sessionHandle = client.getSessionManager().openSession( CLIService.SERVER_VERSION, "", "", "127.0.0.1", Collections.emptyMap()); - session = client.getSessionManager().getSession(sessionHandle); + return client.getSessionManager().getSession(sessionHandle); } catch (Exception e) { throw new RuntimeException("Unable to open new Hive session: ", e); } } public void closeSession() { - Preconditions.checkState(session != null, "There is no open session to be closed."); + Preconditions.checkState(session.get() != null, "There is no open session to be closed."); try { - session.close(); - session = null; + session.get().close(); + session.remove(); } catch (Exception e) { throw new RuntimeException("Unable to close Hive session: ", e); } } public List<Object[]> executeStatement(String statement) { - Preconditions.checkState(session != null, + Preconditions.checkState(session.get() != null, "You have to start TestHiveShell and open a session first, before running a query."); try { - OperationHandle handle = client.executeStatement(session.getSessionHandle(), statement, Collections.emptyMap()); + OperationHandle handle = client.executeStatement(session.get().getSessionHandle(), statement, + Collections.emptyMap()); List<Object[]> resultSet = Lists.newArrayList(); if (handle.hasResultSet()) { RowSet rowSet; @@ -172,14 +174,14 @@ public class TestHiveShell { public Configuration getHiveConf() { if (session != null) { - return session.getHiveConf(); + return session.get().getHiveConf(); } else { return hs2Conf; } } public HiveSession getSession() { - return session; + return session.get(); } private HiveConf initializeConf() { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q new file mode 100644 index 00000000000..e0809721a53 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_atomic_merge_update.q @@ -0,0 +1,99 @@ +-- SORT_QUERY_RESULTS + +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +set hive.optimize.shared.work.merge.ts.schema=true; +set hive.vectorized.execution.enabled=true; + +CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO calls (s_key, year) VALUES (1090969, 2022); + + +CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1'); + + +MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')); + +select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.display.snapshots s + order by s.snapshot_id; + + +-- clean up +DROP TABLE calls; +DROP TABLE display; + + +-- Update + +CREATE EXTERNAL TABLE calls_v2 ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 2023), (3, 2024), (4, 2024), (5, 2025), (1, 2022), (2, 2023), +(3, 2024), (4, 2024), (5, 2025); + +update calls_v2 set s_key=10 where year=2023; +delete from calls_v2 where year=2024; + +select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.calls_v2.snapshots s + order by s.snapshot_id; + +-- clean up + +DROP TABLE calls_v2 \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out new file mode 100644 index 00000000000..7bc1a8db135 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_atomic_merge_update.q.out @@ -0,0 +1,248 @@ +PREHOOK: query: CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@calls +POSTHOOK: query: CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@calls +PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@calls +POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@calls +PREHOOK: query: CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@display +POSTHOOK: query: CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@display +PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@display +POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@display +Warning: Shuffle Join MERGEJOIN[67][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[68][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 8' is a cross product +PREHOOK: query: MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')) +PREHOOK: type: QUERY +PREHOOK: Input: default@calls +PREHOOK: Input: default@display +PREHOOK: Output: default@display +PREHOOK: Output: default@display +PREHOOK: Output: default@merge_tmp_table +POSTHOOK: query: MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@calls +POSTHOOK: Input: default@display +POSTHOOK: Output: default@display +POSTHOOK: Output: default@display +POSTHOOK: Output: default@merge_tmp_table +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ] +PREHOOK: query: select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.display.snapshots s + order by s.snapshot_id +PREHOOK: type: QUERY +PREHOOK: Input: default@display +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.display.snapshots s + order by s.snapshot_id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@display +POSTHOOK: Output: hdfs://### HDFS PATH ### +append 2 NULL +overwrite 4 NULL +PREHOOK: query: DROP TABLE calls +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@calls +PREHOOK: Output: database:default +PREHOOK: Output: default@calls +POSTHOOK: query: DROP TABLE calls +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@calls +POSTHOOK: Output: database:default +POSTHOOK: Output: default@calls +PREHOOK: query: DROP TABLE display +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@display +PREHOOK: Output: database:default +PREHOOK: Output: default@display +POSTHOOK: query: DROP TABLE display +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@display +POSTHOOK: Output: database:default +POSTHOOK: Output: default@display +PREHOOK: query: CREATE EXTERNAL TABLE calls_v2 ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@calls_v2 +POSTHOOK: query: CREATE EXTERNAL TABLE calls_v2 ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS parquet +TBLPROPERTIES ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@calls_v2 +PREHOOK: query: INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 2023), (3, 2024), (4, 2024), (5, 2025), (1, 2022), (2, 2023), +(3, 2024), (4, 2024), (5, 2025) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@calls_v2 +POSTHOOK: query: INSERT INTO calls_v2 (s_key, year) VALUES (1, 2022), (2, 2023), (3, 2024), (4, 2024), (5, 2025), (1, 2022), (2, 2023), +(3, 2024), (4, 2024), (5, 2025) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@calls_v2 +PREHOOK: query: update calls_v2 set s_key=10 where year=2023 +PREHOOK: type: QUERY +PREHOOK: Input: default@calls_v2 +PREHOOK: Output: default@calls_v2 +PREHOOK: Output: default@calls_v2 +POSTHOOK: query: update calls_v2 set s_key=10 where year=2023 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@calls_v2 +POSTHOOK: Output: default@calls_v2 +POSTHOOK: Output: default@calls_v2 +PREHOOK: query: delete from calls_v2 where year=2024 +PREHOOK: type: QUERY +PREHOOK: Input: default@calls_v2 +PREHOOK: Output: default@calls_v2 +POSTHOOK: query: delete from calls_v2 where year=2024 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@calls_v2 +POSTHOOK: Output: default@calls_v2 +PREHOOK: query: select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.calls_v2.snapshots s + order by s.snapshot_id +PREHOOK: type: QUERY +PREHOOK: Input: default@calls_v2 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select s.operation, s.summary['added-records'], s.summary['deleted-records'] from default.calls_v2.snapshots s + order by s.snapshot_id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@calls_v2 +POSTHOOK: Output: hdfs://### HDFS PATH ### +append 10 NULL +overwrite 2 NULL +overwrite NULL NULL +PREHOOK: query: DROP TABLE calls_v2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@calls_v2 +PREHOOK: Output: database:default +PREHOOK: Output: default@calls_v2 +POSTHOOK: query: DROP TABLE calls_v2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@calls_v2 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@calls_v2 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 94e8e0edc2b..1b1c6e23b48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.antlr.runtime.TokenRewriteStream; +import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -124,7 +125,7 @@ public class Context { // Some statements, e.g., UPDATE, DELETE, or MERGE, get rewritten into different // subqueries that create new contexts. We keep them here so we can clean them // up when we are done. - private final Set<Context> subContexts; + private final List<Context> subContexts; private String replPolicy; @@ -231,7 +232,7 @@ public class Context { * These ops require special handling in various places * (note that Insert into Acid table is in OTHER category) */ - public enum Operation {UPDATE, DELETE, MERGE, OTHER} + public enum Operation {UPDATE, DELETE, MERGE, IOW, OTHER} public enum DestClausePrefix { INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-"); private final String prefix; @@ -373,7 +374,7 @@ public class Context { private Context(Configuration conf, String executionId) { this.conf = conf; this.executionId = executionId; - this.subContexts = new HashSet<>(); + this.subContexts = Lists.newArrayList(); // local & non-local tmp location is configurable. however it is the same across // all external file systems @@ -433,7 +434,7 @@ public class Context { this.statsSource = ctx.statsSource; this.executionIndex = ctx.executionIndex; this.viewsTokenRewriteStreams = new HashMap<>(); - this.subContexts = new HashSet<>(); + this.subContexts = Lists.newArrayList(); this.opContext = new CompilationOpContext(); this.enableUnparse = ctx.enableUnparse; this.scheduledQuery = ctx.scheduledQuery; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 5d167695f92..9cdf7f05d22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; @@ -1063,7 +1065,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { private boolean checkAndCommitNatively(MoveWork moveWork, Configuration configuration) throws HiveException { String storageHandlerClass = null; Properties commitProperties = null; - boolean overwrite = false; + Operation operation = context.getOperation(); LoadTableDesc loadTableWork = moveWork.getLoadTableWork(); if (loadTableWork != null) { if (loadTableWork.isUseAppendForLoad()) { @@ -1077,7 +1079,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { storageHandlerClass = tableDesc.getProperties().getProperty( org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE); commitProperties = new Properties(tableDesc.getProperties()); - overwrite = moveWork.getLoadTableWork().isInsertOverwrite(); + if (moveWork.getLoadTableWork().isInsertOverwrite()) { + operation = Operation.IOW; + } } else if (moveWork.getLoadFileWork() != null) { // Get the info from the create table data CreateTableDesc createTableDesc = moveWork.getLoadFileWork().getCtasCreateTableDesc(); @@ -1105,7 +1109,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (storageHandlerClass != null) { HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(configuration, storageHandlerClass); if (storageHandler.commitInMoveTask()) { - storageHandler.storageHandlerCommit(commitProperties, overwrite); + storageHandler.storageHandlerCommit(commitProperties, operation); return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 16f347a8445..c2e5a2cfa12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -543,17 +543,22 @@ public interface HiveStorageHandler extends Configurable { default boolean commitInMoveTask() { return false; } - + /** * Commits the inserts for the non-native tables. Used in the {@link org.apache.hadoop.hive.ql.exec.MoveTask}. * @param commitProperties Commit properties which are needed for the handler based commit - * @param overwrite If this is an INSERT OVERWRITE then it is true + * @param operation the operation type * @throws HiveException If there is an error during commit */ - default void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { + default void storageHandlerCommit(Properties commitProperties, Operation operation) throws HiveException { throw new UnsupportedOperationException(); } + @Deprecated + default void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { + storageHandlerCommit(commitProperties, overwrite ? Operation.IOW : Operation.OTHER); + } + /** * Checks whether a certain ALTER TABLE operation is supported by the storage handler implementation. *