This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 597dc69a85e HIVE-27388: Backport of HIVE-23058: Compaction task 
reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by 
Laszlo Pinter)
597dc69a85e is described below

commit 597dc69a85ec487983a2b12af8e29d24fc61ff04
Author: Diksha628 <43694846+diksha...@users.noreply.github.com>
AuthorDate: Tue Sep 12 12:34:06 2023 +0530

    HIVE-27388: Backport of HIVE-23058: Compaction task reattempt fails with 
FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
    
    Signed-off-by: Sankar Hariappan <sank...@apache.org>
    Closes (#4659)
---
 .../hive/ql/txn/compactor/TestCompactor.java       | 60 +++++++++++++++++++---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 13 ++++-
 2 files changed, 64 insertions(+), 9 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 0827bcdb695..c0cf05ea3d0 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -24,14 +24,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -1602,6 +1595,57 @@ public class TestCompactor {
       0L, 0L, 1);
   }
 
+  @Test
+  public void testCompactionForFileInSratchDir() throws Exception {
+    String dbName = "default";
+    String tblName = "cfs";
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " + 
"STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+            + "'transactional_properties'='default')";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver(createQuery, driver);
+
+
+
+    // Insert some data -> this will generate only insert deltas
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 
'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2, 
'bar')", driver);
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    Map<String, String> tblProperties = new HashMap<>();
+    
tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation()
 + "/" + "_tmp");
+
+    //Create empty file in ScratchDir under table location
+    String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp";
+    Path dir = new Path(scratchDirPath + "/base_0000002_v0000005");
+    fs.mkdirs(dir);
+    Path emptyFile = AcidUtils.createBucketFile(dir, 0);
+    fs.create(emptyFile);
+
+    //Run MajorCompaction
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(conf);
+    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    CompactionRequest Cr = new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR);
+    Cr.setProperties(tblProperties);
+    txnHandler.compact(Cr);
+    t.run();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+  }
+
   @Test
   public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
     String dbName = "default";
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index d7e661bcd26..e3ceb3af055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -1028,7 +1028,18 @@ public class CompactorMR {
         AcidOutputFormat<WritableComparable, V> aof =
             instantiate(AcidOutputFormat.class, 
jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
 
-        writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), 
options);
+        Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+        cleanupTmpLocationOnTaskRetry(options, rootDir);
+        writer = aof.getRawRecordWriter(rootDir, options);
+      }
+   }
+
+    private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options 
options, Path rootDir) throws IOException {
+      Path tmpLocation = AcidUtils.createFilename(rootDir, options);
+      FileSystem fs = tmpLocation.getFileSystem(jobConf);
+
+      if (fs.exists(tmpLocation)) {
+        fs.delete(tmpLocation, true);
       }
     }
 

Reply via email to