This is an automated email from the ASF dual-hosted git repository. sankarh pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 597dc69a85e HIVE-27388: Backport of HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter) 597dc69a85e is described below commit 597dc69a85ec487983a2b12af8e29d24fc61ff04 Author: Diksha628 <43694846+diksha...@users.noreply.github.com> AuthorDate: Tue Sep 12 12:34:06 2023 +0530 HIVE-27388: Backport of HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter) Signed-off-by: Sankar Hariappan <sank...@apache.org> Closes (#4659) --- .../hive/ql/txn/compactor/TestCompactor.java | 60 +++++++++++++++++++--- .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 13 ++++- 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 0827bcdb695..c0cf05ea3d0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -24,14 +24,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1602,6 +1595,57 @@ public class TestCompactor { 0L, 0L, 1); } + @Test + public void testCompactionForFileInSratchDir() throws Exception { + String dbName = "default"; + String tblName = "cfs"; + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " + "STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver(createQuery, driver); + + + + // Insert some data -> this will generate only insert deltas + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2, 'bar')", driver); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + Map<String, String> tblProperties = new HashMap<>(); + tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation() + "/" + "_tmp"); + + //Create empty file in ScratchDir under table location + String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp"; + Path dir = new Path(scratchDirPath + "/base_0000002_v0000005"); + fs.mkdirs(dir); + Path emptyFile = AcidUtils.createBucketFile(dir, 0); + fs.create(emptyFile); + + //Run MajorCompaction + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(conf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + Cr.setProperties(tblProperties); + txnHandler.compact(Cr); + t.run(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + + } + @Test public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { String dbName = "default"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index d7e661bcd26..e3ceb3af055 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -1028,7 +1028,18 @@ public class CompactorMR { AcidOutputFormat<WritableComparable, V> aof = instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); - writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + Path rootDir = new Path(jobConf.get(TMP_LOCATION)); + cleanupTmpLocationOnTaskRetry(options, rootDir); + writer = aof.getRawRecordWriter(rootDir, options); + } + } + + private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options options, Path rootDir) throws IOException { + Path tmpLocation = AcidUtils.createFilename(rootDir, options); + FileSystem fs = tmpLocation.getFileSystem(jobConf); + + if (fs.exists(tmpLocation)) { + fs.delete(tmpLocation, true); } }