This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 96d46dc36cf HIVE-27951: hcatalog dynamic partitioning fails with partition already exist error when exist parent partitions path (#4937) 96d46dc36cf is described below commit 96d46dc36cfd3a68c73f8c77e1f97c1c78507b24 Author: yigress <104102129+yigr...@users.noreply.github.com> AuthorDate: Wed Jan 3 17:03:59 2024 -0800 HIVE-27951: hcatalog dynamic partitioning fails with partition already exist error when exist parent partitions path (#4937) --- .../mapreduce/FileOutputCommitterContainer.java | 37 ++++++++++++++-------- .../mapreduce/TestHCatDynamicPartitioned.java | 17 ++++++---- .../TestHCatExternalDynamicPartitioned.java | 4 +-- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index de9ad252ff2..2ad306165d1 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -488,7 +488,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { } /** - * Move all of the files from the temp directory to the final location + * Move task output from the temp directory to the final location * @param srcf the file to move * @param srcDir the source directory * @param destDir the target directory @@ -538,17 +538,17 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, destDir, immutable); if (immutable && destFs.exists(finalOutputPath) && !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, - "Data already exists in " + finalOutputPath - + ", duplicate publish not possible."); - } - if (srcStatus.isDirectory()) { + if (partitionsDiscoveredByPath.containsKey(srcF.toString())) { + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, + "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); + } + // parent directory may exist for multi-partitions, check lower level partitions + Collections.addAll(srcQ, srcFs.listStatus(srcF,HIDDEN_FILES_PATH_FILTER)); + } else if (srcStatus.isDirectory()) { if (canRename && dynamicPartitioningUsed) { // If it is partition, move the partition directory instead of each file. - // If custom dynamic location provided, need to rename to final output path - final Path parentDir = finalOutputPath.getParent(); - Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath; - moves.add(Pair.of(srcF, dstPath)); + moves.add(Pair.of(srcF, finalOutputPath)); } else { Collections.addAll(srcQ, srcFs.listStatus(srcF, HIDDEN_FILES_PATH_FILTER)); } @@ -558,16 +558,27 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { } } - if (moves.isEmpty()) { + bulkMoveFiles(conf, srcFs, destFs, moves); + } + + /** + * Bulk move files from source to destination. + * @param srcFs the source filesystem where the source files are + * @param destFs the destionation filesystem where the destionation files are + * @param pairs list of pairs of <source_path, destination_path>, move source_path to destination_path + * @throws java.io.IOException + */ + private void bulkMoveFiles(final Configuration conf, final FileSystem srcFs, final FileSystem destFs, List<Pair<Path, Path>> pairs) throws IOException{ + if (pairs.isEmpty()) { return; } - + final boolean canRename = srcFs.getUri().equals(destFs.getUri()); final List<Future<Pair<Path, Path>>> futures = new LinkedList<>(); final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; - for (final Pair<Path, Path> pair: moves){ + for (final Pair<Path, Path> pair: pairs){ Path srcP = pair.getLeft(); Path dstP = pair.getRight(); final String msg = "Unable to move source " + srcP + " to destination " + dstP; diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index a97162de993..5ee3a6348d1 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -52,13 +52,13 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { private static List<HCatFieldSchema> dataColumns; private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); protected static final int NUM_RECORDS = 20; - protected static final int NUM_PARTITIONS = 5; + protected static final int NUM_TOP_PARTITIONS = 5; public TestHCatDynamicPartitioned(String formatName, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { super(formatName, serdeClass, inputFormatClass, outputFormatClass); tableName = "testHCatDynamicPartitionedTable_" + formatName; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); generateDataColumns(); } @@ -67,6 +67,8 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""))); + } protected static void generateWriteRecords(int max, int mod, int offset) { @@ -78,6 +80,7 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { objList.add(i); objList.add("strvalue" + i); objList.add(String.valueOf((i % mod) + offset)); + objList.add(String.valueOf((i / (max/2)) + offset)); writeRecords.add(new DefaultHCatRecord(objList)); } } @@ -86,6 +89,7 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { protected List<FieldSchema> getPartitionKeys() { List<FieldSchema> fields = new ArrayList<FieldSchema>(); fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, "")); return fields; } @@ -117,8 +121,9 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, String customDynamicPathPattern) throws Exception { - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask, customDynamicPathPattern); + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), NUM_RECORDS/2, true, asSingleMapTask, customDynamicPathPattern); + runMRCreate(null, dataColumns, writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2, true, asSingleMapTask, customDynamicPathPattern); runMRRead(NUM_RECORDS); @@ -140,7 +145,7 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { //Test for duplicate publish IOException exc = null; try { - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false, true, customDynamicPathPattern); @@ -167,7 +172,7 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { driver.run(query); res = new ArrayList<String>(); driver.getResults(res); - assertEquals(NUM_PARTITIONS, res.size()); + assertEquals(NUM_TOP_PARTITIONS*2, res.size()); query = "select * from " + tableName; driver.run(query); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java index 18fcfdbdd2a..9698f178a8e 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java @@ -28,7 +28,7 @@ public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartition throws Exception { super(formatName, serdeClass, inputFormatClass, outputFormatClass); tableName = "testHCatExternalDynamicPartitionedTable_" + formatName; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); generateDataColumns(); } @@ -43,7 +43,7 @@ public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartition */ @Test public void testHCatExternalDynamicCustomLocation() throws Exception { - runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}"); + runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}/{p2}"); } }