Repository: hive Updated Branches: refs/heads/master d2cb97b6f -> 53df7e881
HIVE-18832: Support change management for trashing data files from ACID tables.(Anishek Agarwal, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53df7e88 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53df7e88 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53df7e88 Branch: refs/heads/master Commit: 53df7e881723827a29a783fcbff67a16929950ec Parents: d2cb97b Author: Anishek Agarwal <anis...@gmail.com> Authored: Mon Mar 12 11:55:42 2018 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Mon Mar 12 11:55:42 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hive/ql/parse/WarehouseInstance.java | 8 +- .../compactor/TestCleanerWithReplication.java | 198 +++++++++++++++++++ .../hadoop/hive/ql/txn/compactor/Cleaner.java | 25 ++- .../hive/ql/txn/compactor/CompactorTest.java | 9 +- .../hive/ql/txn/compactor/TestCleaner.java | 8 - .../hive/ql/txn/compactor/TestCleaner2.java | 3 - .../hive/ql/txn/compactor/TestInitiator.java | 8 - .../hive/ql/txn/compactor/TestWorker.java | 4 - .../hive/ql/txn/compactor/TestWorker2.java | 4 - .../hive/metastore/ReplChangeManager.java | 163 ++++++++------- .../apache/hadoop/hive/metastore/Warehouse.java | 20 +- 11 files changed, 319 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 33e5157..feb1191 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -56,7 +56,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -class WarehouseInstance implements Closeable { +public class WarehouseInstance implements Closeable { final String functionsRoot; private Logger logger; private IDriver driver; @@ -85,7 +85,7 @@ class WarehouseInstance implements Closeable { initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster, + public WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf) throws Exception { this(logger, cluster, overridesForHiveConf, null); } @@ -165,7 +165,7 @@ class WarehouseInstance implements Closeable { return (lastResults.get(0).split("\\t"))[colNum]; } - WarehouseInstance run(String command) throws Throwable { + public WarehouseInstance run(String command) throws Throwable { CommandProcessorResponse ret = driver.run(command); if (ret.getException() != null) { throw ret.getException(); @@ -257,7 +257,7 @@ class WarehouseInstance implements Closeable { return this; } - List<String> getOutput() throws IOException { + public List<String> getOutput() throws IOException { List<String> results = new ArrayList<>(); driver.getResults(results); return results; http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java new file mode 100644 index 0000000..c0751a7 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.security.auth.login.LoginException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class TestCleanerWithReplication extends CompactorTest { + private Path cmRootDirectory; + private static FileSystem fs; + private static MiniDFSCluster miniDFSCluster; + + @Before + public void setup() throws Exception { + conf = new HiveConf(); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.cleanDb(conf); + conf.set("fs.defaultFS", fs.getUri().toString()); + conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + ms = new HiveMetaStoreClient(conf); + txnHandler = TxnUtils.getTxnStore(conf); + cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname)); + if (!fs.exists(cmRootDirectory)) { + fs.mkdirs(cmRootDirectory); + } + tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); + } + + @BeforeClass + public static void classLevelSetup() throws LoginException, IOException { + Configuration hadoopConf = new Configuration(); + hadoopConf.set("dfs.client.use.datanode.hostname", "true"); + hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + miniDFSCluster = + new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(true).build(); + fs = miniDFSCluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + fs.delete(cmRootDirectory, true); + compactorTestCleanup(); + } + + @AfterClass + public static void tearDownClass() { + miniDFSCluster.shutdown(); + } + + @Test + public void cleanupAfterMajorTableCompaction() throws Exception { + Table t = newTable("default", "camtc", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addBaseFile(t, null, 25L, 25); + + burnThroughTransactions("default", "camtc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + assertCleanerActions(6); + } + + @Test + public void cleanupAfterMajorPartitionCompaction() throws Exception { + Table t = newTable("default", "campc", true); + Partition p = newPartition(t, "today"); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addBaseFile(t, p, 25L, 25); + + burnThroughTransactions("default", "campc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + assertCleanerActions(6); + } + + @Test + public void cleanupAfterMinorTableCompaction() throws Exception { + Table t = newTable("default", "camitc", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 21L, 24L, 4); + + burnThroughTransactions("default", "camitc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + assertCleanerActions(4); + } + + @Test + public void cleanupAfterMinorPartitionCompaction() throws Exception { + Table t = newTable("default", "camipc", true); + Partition p = newPartition(t, "today"); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaFile(t, p, 21L, 24L, 4); + + burnThroughTransactions("default", "camipc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + assertCleanerActions(4); + } + + private void assertCleanerActions(int expectedNumOCleanedFiles) throws Exception { + assertEquals("there should be no deleted files in cm root", 0, + fs.listStatus(cmRootDirectory).length); + + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + String state = rsp.getCompacts().get(0).getState(); + Assert.assertTrue("unexpected state " + state, TxnStore.SUCCEEDED_RESPONSE.equals(state)); + + assertEquals( + "there should be " + String.valueOf(expectedNumOCleanedFiles) + " deleted files in cm root", + expectedNumOCleanedFiles, fs.listStatus(cmRootDirectory).length + ); + } + + @Override + boolean useHive130DeltaDirName() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index af0884c..df9a5a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to clean directories after compactions. This will run in a separate thread. @@ -55,12 +57,18 @@ import java.util.concurrent.TimeUnit; public class Cleaner extends CompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private long cleanerCheckInterval = 0; + private ReplChangeManager replChangeManager; // List of compactions to clean. - private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>(); - private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>(); + private Map<Long, Set<Long>> compactId2LockMap = new HashMap<>(); + private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<>(); + + @Override + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + super.init(stop, looped); + replChangeManager = ReplChangeManager.getInstance(conf); + } @Override public void run() { @@ -245,10 +253,10 @@ public class Cleaner extends CompactorThread { * Each Compaction only compacts as far as the highest txn id such that all txns below it * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info. - * + * * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from * under an active reader. - * + * * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a * clean request for D2. * Cleaner checks existing locks and finds none. @@ -258,8 +266,9 @@ public class Cleaner extends CompactorThread { * unless ValidTxnList is "capped" at highestWriteId. */ final ValidWriteIdList txnList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId) - : new ValidReaderWriteIdList(); + ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), + ci.highestWriteId) + : new ValidReaderWriteIdList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); @@ -306,8 +315,8 @@ public class Cleaner extends CompactorThread { for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); fs.delete(dead, true); } } - } http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 7ea017a..083c671 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.thrift.TException; +import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,19 +93,19 @@ public abstract class CompactorTest { protected TxnStore txnHandler; protected IMetaStoreClient ms; - protected long sleepTime = 1000; protected HiveConf conf; private final AtomicBoolean stop = new AtomicBoolean(); - private final File tmpdir; + protected File tmpdir; - protected CompactorTest() throws Exception { + @Before + public void setup() throws Exception { conf = new HiveConf(); TxnDbUtil.setConfValues(conf); TxnDbUtil.cleanDb(conf); ms = new HiveMetaStoreClient(conf); txnHandler = TxnUtils.getTxnStore(conf); - tmpdir = new File (Files.createTempDirectory("compactor_test_table_").toString()); + tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); } protected void compactorTestCleanup() throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 3ca073c..ce574b4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -40,8 +40,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -54,12 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class TestCleaner extends CompactorTest { - static final private Logger LOG = LoggerFactory.getLogger(TestCleaner.class.getName()); - - public TestCleaner() throws Exception { - super(); - } - @Test public void nothing() throws Exception { // Test that the whole things works when there's nothing in the queue. This is just a http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java index 1cbd9bc..e2aeb9c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java @@ -21,9 +21,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; * Same as TestCleaner but tests delta file names in Hive 1.3.0 format */ public class TestCleaner2 extends TestCleaner { - public TestCleaner2() throws Exception { - super(); - } @Override boolean useHive130DeltaDirName() { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index d2818db..5648393 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -41,8 +41,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -55,12 +53,6 @@ import java.util.concurrent.TimeUnit; * Tests for the compactor Initiator thread. */ public class TestInitiator extends CompactorTest { - static final private String CLASS_NAME = TestInitiator.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - - public TestInitiator() throws Exception { - super(); - } @Test public void nothing() throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 1d9c9a7..488cd90 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -65,10 +65,6 @@ public class TestWorker extends CompactorTest { static final private String CLASS_NAME = TestWorker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - public TestWorker() throws Exception { - super(); - } - @Test public void nothing() throws Exception { // Test that the whole things works when there's nothing in the queue. This is just a http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java index f07c050..b6b8788 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java @@ -22,10 +22,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; */ public class TestWorker2 extends TestWorker { - public TestWorker2() throws Exception { - super(); - } - @Override boolean useHive130DeltaDirName() { return true; http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 95fa0a9..7c1d5f5 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -163,104 +163,99 @@ public class ReplChangeManager { * @param ifPurge if the file should skip Trash when move/delete source file. * This is referred only if type is MOVE. * @return int - * @throws MetaException + * @throws IOException */ - int recycle(Path path, RecycleType type, boolean ifPurge) throws MetaException { + public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { if (!enabled) { return 0; } - try { - int count = 0; - - if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, hiddenFileFilter); - for (FileStatus file : files) { - count += recycle(file.getPath(), type, ifPurge); - } + int count = 0; + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, hiddenFileFilter); + for (FileStatus file : files) { + count += recycle(file.getPath(), type, ifPurge); + } + } else { + String fileCheckSum = checksumFor(path, fs); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); + + // set timestamp before moving to cmroot, so we can + // avoid race condition CM remove the file before setting + // timestamp + long now = System.currentTimeMillis(); + fs.setTimes(path, now, -1); + + boolean success = false; + if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { + // If already a file with same checksum exists in cmPath, just ignore the copy/move + // Also, mark the operation is unsuccessful to notify that file with same name already + // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by + // CM cleaner. + success = false; } else { - String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); - - // set timestamp before moving to cmroot, so we can - // avoid race condition CM remove the file before setting - // timestamp - long now = System.currentTimeMillis(); - fs.setTimes(path, now, -1); - - boolean success = false; - if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { - // If already a file with same checksum exists in cmPath, just ignore the copy/move - // Also, mark the operation is unsuccessful to notify that file with same name already - // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by - // CM cleaner. - success = false; - } else { - switch (type) { - case MOVE: { - if (LOG.isDebugEnabled()) { - LOG.debug("Moving {} to {}", path.toString(), cmPath.toString()); - } - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); - break; - } - case COPY: { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying {} to {}", path.toString(), cmPath.toString()); - } - // It is possible to have a file with same checksum in cmPath but the content is - // partially copied or corrupted. In this case, just overwrite the existing file with - // new one. - success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); - break; - } - default: - // Operation fails as invalid input - break; + switch (type) { + case MOVE: { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving {} to {}", path.toString(), cmPath.toString()); } + // Rename fails if the file with same name already exist. + success = fs.rename(path, cmPath); + break; } - - // Ignore if a file with same content already exist in cmroot - // We might want to setXAttr for the new location in the future - if (success) { - // set the file owner to hive (or the id metastore run as) - fs.setOwner(cmPath, msUser, msGroup); - - // tag the original file name so we know where the file comes from - // Note we currently only track the last known trace as - // xattr has limited capacity. We shall revisit and store all original - // locations if orig-loc becomes important - try { - fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", path.toString()); - } - - count++; - } else { + case COPY: { if (LOG.isDebugEnabled()) { - LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); + LOG.debug("Copying {} to {}", path.toString(), cmPath.toString()); } - // Need to extend the tenancy if we saw a newer file with the same content - fs.setTimes(cmPath, now, -1); + // It is possible to have a file with same checksum in cmPath but the content is + // partially copied or corrupted. In this case, just overwrite the existing file with + // new one. + success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); + break; } + default: + // Operation fails as invalid input + break; + } + } - // Tag if we want to remain in trash after deletion. - // If multiple files share the same content, then - // any file claim remain in trash would be granted - if ((type == RecycleType.MOVE) && !ifPurge) { - try { - fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", cmPath.toString()); - } + // Ignore if a file with same content already exist in cmroot + // We might want to setXAttr for the new location in the future + if (success) { + // set the file owner to hive (or the id metastore run as) + fs.setOwner(cmPath, msUser, msGroup); + + // tag the original file name so we know where the file comes from + // Note we currently only track the last known trace as + // xattr has limited capacity. We shall revisit and store all original + // locations if orig-loc becomes important + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", path.toString()); + } + + count++; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); + } + // Need to extend the tenancy if we saw a newer file with the same content + fs.setTimes(cmPath, now, -1); + } + + // Tag if we want to remain in trash after deletion. + // If multiple files share the same content, then + // any file claim remain in trash would be granted + if ((type == RecycleType.MOVE) && !ifPurge) { + try { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 }); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", cmPath.toString()); } } - return count; - } catch (IOException e) { - throw new MetaException(StringUtils.stringifyException(e)); } + return count; } // Get checksum of a file @@ -289,7 +284,7 @@ public class ReplChangeManager { * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} * @return Path */ - static Path getCMPath(Configuration conf, String name, String checkSum) throws IOException, MetaException { + static Path getCMPath(Configuration conf, String name, String checkSum) { String newFileName = name + "_" + checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 445a7b8..d4a0819 100755 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -220,7 +220,11 @@ public class Warehouse { } void addToChangeManagement(Path file) throws MetaException { - cm.recycle(file, RecycleType.COPY, true); + try { + cm.recycle(file, RecycleType.COPY, true); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } public boolean deleteDir(Path f, boolean recursive) throws MetaException { @@ -234,15 +238,23 @@ public class Warehouse { public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { // no need to create the CM recycle file for temporary tables if (needCmRecycle) { - cm.recycle(f, RecycleType.MOVE, ifPurge); + + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } FileSystem fs = getFs(f); return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); } public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException { - cm.recycle(f, RecycleType.MOVE, ifPurge); - return; + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } public boolean isEmpty(Path path) throws IOException, MetaException {