Repository: hive
Updated Branches:
  refs/heads/master d2cb97b6f -> 53df7e881


HIVE-18832: Support change management for trashing data files from ACID 
tables.(Anishek Agarwal, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53df7e88
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53df7e88
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53df7e88

Branch: refs/heads/master
Commit: 53df7e881723827a29a783fcbff67a16929950ec
Parents: d2cb97b
Author: Anishek Agarwal <anis...@gmail.com>
Authored: Mon Mar 12 11:55:42 2018 +0530
Committer: Anishek Agarwal <anis...@gmail.com>
Committed: Mon Mar 12 11:55:42 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   8 +-
 .../compactor/TestCleanerWithReplication.java   | 198 +++++++++++++++++++
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  25 ++-
 .../hive/ql/txn/compactor/CompactorTest.java    |   9 +-
 .../hive/ql/txn/compactor/TestCleaner.java      |   8 -
 .../hive/ql/txn/compactor/TestCleaner2.java     |   3 -
 .../hive/ql/txn/compactor/TestInitiator.java    |   8 -
 .../hive/ql/txn/compactor/TestWorker.java       |   4 -
 .../hive/ql/txn/compactor/TestWorker2.java      |   4 -
 .../hive/metastore/ReplChangeManager.java       | 163 ++++++++-------
 .../apache/hadoop/hive/metastore/Warehouse.java |  20 +-
 11 files changed, 319 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 33e5157..feb1191 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -56,7 +56,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-class WarehouseInstance implements Closeable {
+public class WarehouseInstance implements Closeable {
   final String functionsRoot;
   private Logger logger;
   private IDriver driver;
@@ -85,7 +85,7 @@ class WarehouseInstance implements Closeable {
     initialize(cmRootPath.toString(), warehouseRoot.toString(), 
overridesForHiveConf);
   }
 
-  WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+  public WarehouseInstance(Logger logger, MiniDFSCluster cluster,
       Map<String, String> overridesForHiveConf) throws Exception {
     this(logger, cluster, overridesForHiveConf, null);
   }
@@ -165,7 +165,7 @@ class WarehouseInstance implements Closeable {
     return (lastResults.get(0).split("\\t"))[colNum];
   }
 
-  WarehouseInstance run(String command) throws Throwable {
+  public WarehouseInstance run(String command) throws Throwable {
     CommandProcessorResponse ret = driver.run(command);
     if (ret.getException() != null) {
       throw ret.getException();
@@ -257,7 +257,7 @@ class WarehouseInstance implements Closeable {
     return this;
   }
 
-  List<String> getOutput() throws IOException {
+  public List<String> getOutput() throws IOException {
     List<String> results = new ArrayList<>();
     driver.getResults(results);
     return results;

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
new file mode 100644
index 0000000..c0751a7
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCleanerWithReplication extends CompactorTest {
+  private Path cmRootDirectory;
+  private static FileSystem fs;
+  private static MiniDFSCluster miniDFSCluster;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HiveConf();
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.cleanDb(conf);
+    conf.set("fs.defaultFS", fs.getUri().toString());
+    conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    ms = new HiveMetaStoreClient(conf);
+    txnHandler = TxnUtils.getTxnStore(conf);
+    cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
+    if (!fs.exists(cmRootDirectory)) {
+      fs.mkdirs(cmRootDirectory);
+    }
+    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
+  }
+
+  @BeforeClass
+  public static void classLevelSetup() throws LoginException, IOException {
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.set("dfs.client.use.datanode.hostname", "true");
+    hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+    miniDFSCluster =
+        new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    fs.delete(cmRootDirectory, true);
+    compactorTestCleanup();
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    miniDFSCluster.shutdown();
+  }
+
+  @Test
+  public void cleanupAfterMajorTableCompaction() throws Exception {
+    Table t = newTable("default", "camtc", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+    addBaseFile(t, null, 25L, 25);
+
+    burnThroughTransactions("default", "camtc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    assertCleanerActions(6);
+  }
+
+  @Test
+  public void cleanupAfterMajorPartitionCompaction() throws Exception {
+    Table t = newTable("default", "campc", true);
+    Partition p = newPartition(t, "today");
+
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 2);
+    addBaseFile(t, p, 25L, 25);
+
+    burnThroughTransactions("default", "campc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "campc", 
CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    assertCleanerActions(6);
+  }
+
+  @Test
+  public void cleanupAfterMinorTableCompaction() throws Exception {
+    Table t = newTable("default", "camitc", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+    addDeltaFile(t, null, 21L, 24L, 4);
+
+    burnThroughTransactions("default", "camitc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camitc", 
CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    assertCleanerActions(4);
+  }
+
+  @Test
+  public void cleanupAfterMinorPartitionCompaction() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    Partition p = newPartition(t, "today");
+
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 2);
+    addDeltaFile(t, p, 21L, 24L, 4);
+
+    burnThroughTransactions("default", "camipc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camipc", 
CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    assertCleanerActions(4);
+  }
+
+  private void assertCleanerActions(int expectedNumOCleanedFiles) throws 
Exception {
+    assertEquals("there should be no deleted files in cm root", 0,
+        fs.listStatus(cmRootDirectory).length);
+
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    String state = rsp.getCompacts().get(0).getState();
+    Assert.assertTrue("unexpected state " + state, 
TxnStore.SUCCEEDED_RESPONSE.equals(state));
+
+    assertEquals(
+        "there should be " + String.valueOf(expectedNumOCleanedFiles) + " 
deleted files in cm root",
+        expectedNumOCleanedFiles, fs.listStatus(cmRootDirectory).length
+    );
+  }
+
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index af0884c..df9a5a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A class to clean directories after compactions.  This will run in a 
separate thread.
@@ -55,12 +57,18 @@ import java.util.concurrent.TimeUnit;
 public class Cleaner extends CompactorThread {
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
   private long cleanerCheckInterval = 0;
 
+  private ReplChangeManager replChangeManager;
   // List of compactions to clean.
-  private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, 
Set<Long>>();
-  private Map<Long, CompactionInfo> compactId2CompactInfoMap = new 
HashMap<Long, CompactionInfo>();
+  private Map<Long, Set<Long>> compactId2LockMap = new HashMap<>();
+  private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<>();
+
+  @Override
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
+    super.init(stop, looped);
+    replChangeManager = ReplChangeManager.getInstance(conf);
+  }
 
   @Override
   public void run() {
@@ -245,10 +253,10 @@ public class Cleaner extends CompactorThread {
        * Each Compaction only compacts as far as the highest txn id such that 
all txns below it
        * are resolved (i.e. not opened).  This is what "highestWriteId" 
tracks.  This is only tracked
        * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorWriteIdList 
and uses for more info.
-       * 
+       *
        * We only want to clean up to the highestWriteId - otherwise we risk 
deleting deltas from
        * under an active reader.
-       * 
+       *
        * Suppose we have deltas D2 D3 for table T, i.e. the last compaction 
created D3 so now there is a 
        * clean request for D2.  
        * Cleaner checks existing locks and finds none.
@@ -258,8 +266,9 @@ public class Cleaner extends CompactorThread {
        * unless ValidTxnList is "capped" at highestWriteId.
        */
       final ValidWriteIdList txnList = (ci.highestWriteId > 0)
-              ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], 
new BitSet(), ci.highestWriteId)
-              : new ValidReaderWriteIdList();
+          ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new 
BitSet(),
+          ci.highestWriteId)
+          : new ValidReaderWriteIdList();
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);
@@ -306,8 +315,8 @@ public class Cleaner extends CompactorThread {
 
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
+      replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, 
true);
       fs.delete(dead, true);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 7ea017a..083c671 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 import org.apache.thrift.TException;
+import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,19 +93,19 @@ public abstract class CompactorTest {
 
   protected TxnStore txnHandler;
   protected IMetaStoreClient ms;
-  protected long sleepTime = 1000;
   protected HiveConf conf;
 
   private final AtomicBoolean stop = new AtomicBoolean();
-  private final File tmpdir;
+  protected File tmpdir;
 
-  protected CompactorTest() throws Exception {
+  @Before
+  public void setup() throws Exception {
     conf = new HiveConf();
     TxnDbUtil.setConfValues(conf);
     TxnDbUtil.cleanDb(conf);
     ms = new HiveMetaStoreClient(conf);
     txnHandler = TxnUtils.getTxnStore(conf);
-    tmpdir = new File 
(Files.createTempDirectory("compactor_test_table_").toString());
+    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
   }
 
   protected void compactorTestCleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 3ca073c..ce574b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,12 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class TestCleaner extends CompactorTest {
 
-  static final private Logger LOG = 
LoggerFactory.getLogger(TestCleaner.class.getName());
-
-  public TestCleaner() throws Exception {
-    super();
-  }
-
   @Test
   public void nothing() throws Exception {
     // Test that the whole things works when there's nothing in the queue.  
This is just a

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
index 1cbd9bc..e2aeb9c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
@@ -21,9 +21,6 @@ package org.apache.hadoop.hive.ql.txn.compactor;
  * Same as TestCleaner but tests delta file names in Hive 1.3.0 format 
  */
 public class TestCleaner2 extends TestCleaner {
-  public TestCleaner2() throws Exception {
-    super();
-  }
   @Override
   boolean useHive130DeltaDirName() {
     return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index d2818db..5648393 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -55,12 +53,6 @@ import java.util.concurrent.TimeUnit;
  * Tests for the compactor Initiator thread.
  */
 public class TestInitiator extends CompactorTest {
-  static final private String CLASS_NAME = TestInitiator.class.getName();
-  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
-  public TestInitiator() throws Exception {
-    super();
-  }
 
   @Test
   public void nothing() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 1d9c9a7..488cd90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -65,10 +65,6 @@ public class TestWorker extends CompactorTest {
   static final private String CLASS_NAME = TestWorker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  public TestWorker() throws Exception {
-    super();
-  }
-
   @Test
   public void nothing() throws Exception {
     // Test that the whole things works when there's nothing in the queue.  
This is just a

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
index f07c050..b6b8788 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
@@ -22,10 +22,6 @@ package org.apache.hadoop.hive.ql.txn.compactor;
  */
 public class TestWorker2 extends TestWorker {
 
-  public TestWorker2() throws Exception {
-    super();
-  }
-
   @Override
   boolean useHive130DeltaDirName() {
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 95fa0a9..7c1d5f5 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -163,104 +163,99 @@ public class ReplChangeManager {
    * @param ifPurge if the file should skip Trash when move/delete source file.
    *                This is referred only if type is MOVE.
    * @return int
-   * @throws MetaException
+   * @throws IOException
    */
-  int recycle(Path path, RecycleType type, boolean ifPurge) throws 
MetaException {
+  public int recycle(Path path, RecycleType type, boolean ifPurge) throws 
IOException {
     if (!enabled) {
       return 0;
     }
 
-    try {
-      int count = 0;
-
-      if (fs.isDirectory(path)) {
-        FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
-        for (FileStatus file : files) {
-          count += recycle(file.getPath(), type, ifPurge);
-        }
+    int count = 0;
+    if (fs.isDirectory(path)) {
+      FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
+      for (FileStatus file : files) {
+        count += recycle(file.getPath(), type, ifPurge);
+      }
+    } else {
+      String fileCheckSum = checksumFor(path, fs);
+      Path cmPath = getCMPath(conf, path.getName(), fileCheckSum);
+
+      // set timestamp before moving to cmroot, so we can
+      // avoid race condition CM remove the file before setting
+      // timestamp
+      long now = System.currentTimeMillis();
+      fs.setTimes(path, now, -1);
+
+      boolean success = false;
+      if (fs.exists(cmPath) && 
fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) {
+        // If already a file with same checksum exists in cmPath, just ignore 
the copy/move
+        // Also, mark the operation is unsuccessful to notify that file with 
same name already
+        // exist which will ensure the timestamp of cmPath is updated to avoid 
clean-up by
+        // CM cleaner.
+        success = false;
       } else {
-        String fileCheckSum = checksumFor(path, fs);
-        Path cmPath = getCMPath(conf, path.getName(), fileCheckSum);
-
-        // set timestamp before moving to cmroot, so we can
-        // avoid race condition CM remove the file before setting
-        // timestamp
-        long now = System.currentTimeMillis();
-        fs.setTimes(path, now, -1);
-
-        boolean success = false;
-        if (fs.exists(cmPath) && 
fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) {
-          // If already a file with same checksum exists in cmPath, just 
ignore the copy/move
-          // Also, mark the operation is unsuccessful to notify that file with 
same name already
-          // exist which will ensure the timestamp of cmPath is updated to 
avoid clean-up by
-          // CM cleaner.
-          success = false;
-        } else {
-          switch (type) {
-            case MOVE: {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Moving {} to {}", path.toString(), 
cmPath.toString());
-              }
-              // Rename fails if the file with same name already exist.
-              success = fs.rename(path, cmPath);
-              break;
-            }
-            case COPY: {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Copying {} to {}", path.toString(), 
cmPath.toString());
-              }
-              // It is possible to have a file with same checksum in cmPath 
but the content is
-              // partially copied or corrupted. In this case, just overwrite 
the existing file with
-              // new one.
-              success = FileUtils.copy(fs, path, fs, cmPath, false, true, 
conf);
-              break;
-            }
-            default:
-              // Operation fails as invalid input
-              break;
+        switch (type) {
+        case MOVE: {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Moving {} to {}", path.toString(), cmPath.toString());
           }
+          // Rename fails if the file with same name already exist.
+          success = fs.rename(path, cmPath);
+          break;
         }
-
-        // Ignore if a file with same content already exist in cmroot
-        // We might want to setXAttr for the new location in the future
-        if (success) {
-          // set the file owner to hive (or the id metastore run as)
-          fs.setOwner(cmPath, msUser, msGroup);
-
-          // tag the original file name so we know where the file comes from
-          // Note we currently only track the last known trace as
-          // xattr has limited capacity. We shall revisit and store all 
original
-          // locations if orig-loc becomes important
-          try {
-            fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
-          } catch (UnsupportedOperationException e) {
-            LOG.warn("Error setting xattr for {}", path.toString());
-          }
-
-          count++;
-        } else {
+        case COPY: {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("A file with the same content of {} already exists, 
ignore", path.toString());
+            LOG.debug("Copying {} to {}", path.toString(), cmPath.toString());
           }
-          // Need to extend the tenancy if we saw a newer file with the same 
content
-          fs.setTimes(cmPath, now, -1);
+          // It is possible to have a file with same checksum in cmPath but 
the content is
+          // partially copied or corrupted. In this case, just overwrite the 
existing file with
+          // new one.
+          success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf);
+          break;
         }
+        default:
+          // Operation fails as invalid input
+          break;
+        }
+      }
 
-        // Tag if we want to remain in trash after deletion.
-        // If multiple files share the same content, then
-        // any file claim remain in trash would be granted
-        if ((type == RecycleType.MOVE) && !ifPurge) {
-          try {
-            fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
-          } catch (UnsupportedOperationException e) {
-            LOG.warn("Error setting xattr for {}", cmPath.toString());
-          }
+      // Ignore if a file with same content already exist in cmroot
+      // We might want to setXAttr for the new location in the future
+      if (success) {
+        // set the file owner to hive (or the id metastore run as)
+        fs.setOwner(cmPath, msUser, msGroup);
+
+        // tag the original file name so we know where the file comes from
+        // Note we currently only track the last known trace as
+        // xattr has limited capacity. We shall revisit and store all original
+        // locations if orig-loc becomes important
+        try {
+          fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+        } catch (UnsupportedOperationException e) {
+          LOG.warn("Error setting xattr for {}", path.toString());
+        }
+
+        count++;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("A file with the same content of {} already exists, 
ignore", path.toString());
+        }
+        // Need to extend the tenancy if we saw a newer file with the same 
content
+        fs.setTimes(cmPath, now, -1);
+      }
+
+      // Tag if we want to remain in trash after deletion.
+      // If multiple files share the same content, then
+      // any file claim remain in trash would be granted
+      if ((type == RecycleType.MOVE) && !ifPurge) {
+        try {
+          fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 });
+        } catch (UnsupportedOperationException e) {
+          LOG.warn("Error setting xattr for {}", cmPath.toString());
         }
       }
-      return count;
-    } catch (IOException e) {
-      throw new MetaException(StringUtils.stringifyException(e));
     }
+    return count;
   }
 
   // Get checksum of a file
@@ -289,7 +284,7 @@ public class ReplChangeManager {
    * @param checkSum checksum of the file, can be retrieved by {@link 
#checksumFor(Path, FileSystem)}
    * @return Path
    */
-  static Path getCMPath(Configuration conf, String name, String checkSum) 
throws IOException, MetaException {
+  static Path getCMPath(Configuration conf, String name, String checkSum) {
     String newFileName = name + "_" + checkSum;
     int maxLength = 
conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 445a7b8..d4a0819 100755
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -220,7 +220,11 @@ public class Warehouse {
   }
 
   void addToChangeManagement(Path file) throws MetaException {
-    cm.recycle(file, RecycleType.COPY, true);
+    try {
+      cm.recycle(file, RecycleType.COPY, true);
+    } catch (IOException e) {
+      throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
   }
 
   public boolean deleteDir(Path f, boolean recursive) throws MetaException {
@@ -234,15 +238,23 @@ public class Warehouse {
   public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean 
needCmRecycle) throws MetaException {
     // no need to create the CM recycle file for temporary tables
     if (needCmRecycle) {
-      cm.recycle(f, RecycleType.MOVE, ifPurge);
+
+      try {
+        cm.recycle(f, RecycleType.MOVE, ifPurge);
+      } catch (IOException e) {
+        throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
     }
     FileSystem fs = getFs(f);
     return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf);
   }
 
   public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException 
{
-    cm.recycle(f, RecycleType.MOVE, ifPurge);
-    return;
+    try {
+      cm.recycle(f, RecycleType.MOVE, ifPurge);
+    } catch (IOException e) {
+      throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
   }
 
   public boolean isEmpty(Path path) throws IOException, MetaException {

Reply via email to