Repository: hive Updated Branches: refs/heads/master 212f18091 -> b558d49b0
HIVE-15448: ChangeManager (Daniel Dai, reviewed by Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b558d49b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b558d49b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b558d49b Branch: refs/heads/master Commit: b558d49b03abf541dce093fd3f360a2e7b87de2e Parents: 212f180 Author: Daniel Dai <da...@hortonworks.com> Authored: Tue Dec 27 16:33:28 2016 -0800 Committer: Daniel Dai <da...@hortonworks.com> Committed: Tue Dec 27 16:33:28 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 10 + .../hive/metastore/TestReplChangeManager.java | 318 +++++++++++++++++++ .../hadoop/hive/metastore/HiveMetaStore.java | 2 + .../hive/metastore/ReplChangeManager.java | 281 ++++++++++++++++ .../apache/hadoop/hive/metastore/Warehouse.java | 4 +- 5 files changed, 613 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b4e89b0..8bb7337 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -437,6 +437,16 @@ public class HiveConf extends Configuration { "with ${hive.scratch.dir.permission}."), REPLDIR("hive.repl.rootdir","/user/hive/repl/", "HDFS root dir for all replication dumps."), + REPLCMENABLED("hive.repl.cm.enabled", false, + "Turn on ChangeManager, so delete files will goes to cmrootdir."), + REPLCMDIR("hive.repl.cmrootdir","/user/hive/cmroot/", + "Root dir for ChangeManager, used for deleted files."), + REPLCMRETIAN("hive.repl.cm.retain","24h", + new TimeValidator(TimeUnit.HOURS), + "Time to retain removed files in cmrootdir."), + REPLCMINTERVAL("hive.repl.cm.interval","3600s", + new TimeValidator(TimeUnit.SECONDS), + "Inteval for cmroot cleanup thread."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java new file mode 100644 index 0000000..0587221 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -0,0 +1,318 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.util.StringUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestReplChangeManager { + private static HiveMetaStoreClient client; + private static HiveConf hiveConf; + private static Warehouse warehouse; + private static MiniDFSCluster m_dfs; + private static String cmroot; + + @BeforeClass + public static void setUp() throws Exception { + m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build(); + hiveConf = new HiveConf(TestReplChangeManager.class); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + m_dfs.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + cmroot = "hdfs://" + m_dfs.getNameNode().getHostAndPort() + "/cmroot"; + hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); + warehouse = new Warehouse(hiveConf); + try { + client = new HiveMetaStoreClient(hiveConf); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + try { + m_dfs.shutdown(); + client.close(); + } catch (Throwable e) { + System.err.println("Unable to close metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + private Partition createPartition(String dbName, String tblName, + List<FieldSchema> columns, List<String> partVals, SerDeInfo serdeInfo) { + StorageDescriptor sd = new StorageDescriptor(columns, null, + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + false, 0, serdeInfo, null, null, null); + return new Partition(partVals, dbName, tblName, 0, 0, sd, null); + } + + private void createFile(Path path, String content) throws IOException { + FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); + output.writeChars(content); + output.close(); + } + + @Test + public void testRecyclePartTable() throws Exception { + // Create db1/t1/dt=20160101/part + // /dt=20160102/part + // /dt=20160103/part + // Test: recycle single partition (dt=20160101) + // recycle table t1 + String dbName = "db1"; + client.dropDatabase(dbName, true, true); + + Database db = new Database(); + db.setName(dbName); + client.createDatabase(db); + + String tblName = "t1"; + List<FieldSchema> columns = new ArrayList<FieldSchema>(); + columns.add(new FieldSchema("foo", "string", "")); + columns.add(new FieldSchema("bar", "string", "")); + + List<FieldSchema> partColumns = new ArrayList<FieldSchema>(); + partColumns.add(new FieldSchema("dt", "string", "")); + + SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>()); + + StorageDescriptor sd + = new StorageDescriptor(columns, null, + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + false, 0, serdeInfo, null, null, null); + Map<String, String> tableParameters = new HashMap<String, String>(); + + Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, partColumns, tableParameters, "", "", ""); + + client.createTable(tbl); + + List<String> values = Arrays.asList("20160101"); + Partition part1 = createPartition(dbName, tblName, columns, values, serdeInfo); + client.add_partition(part1); + + values = Arrays.asList("20160102"); + Partition part2 = createPartition(dbName, tblName, columns, values, serdeInfo); + client.add_partition(part2); + + values = Arrays.asList("20160103"); + Partition part3 = createPartition(dbName, tblName, columns, values, serdeInfo); + client.add_partition(part3); + + Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part"); + createFile(part1Path, "p1"); + String path1Sig = ReplChangeManager.getCksumString(part1Path, hiveConf); + + Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part"); + createFile(part2Path, "p2"); + String path2Sig = ReplChangeManager.getCksumString(part2Path, hiveConf); + + Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part"); + createFile(part3Path, "p3"); + String path3Sig = ReplChangeManager.getCksumString(part3Path, hiveConf); + + Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); + Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); + Assert.assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path)); + + ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse); + // verify cm.recycle(db, table, part) api moves file to cmroot dir + int ret = cm.recycle(db, tbl, part1); + Assert.assertEquals(ret, 0); + ret = cm.recycle(db, tbl, part2); + Assert.assertEquals(ret, 0); + ret = cm.recycle(db, tbl, part3); + Assert.assertEquals(ret, 0); + + Assert.assertFalse(part1Path.getFileSystem(hiveConf).exists(part1Path)); + Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); + Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); + + client.dropPartition(dbName, tblName, Arrays.asList("20160101")); + + Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Sig); + Assert.assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); + + client.dropTable(dbName, tblName); + + Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Sig); + Assert.assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); + + Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Sig); + Assert.assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); + + client.dropDatabase(dbName, true, true); + } + + @Test + public void testRecycleNonPartTable() throws Exception { + // Create db2/t1/part1 + // /part2 + // /part3 + // Test: recycle single file (part1) + // recycle table t1 + String dbName = "db2"; + client.dropDatabase(dbName, true, true); + + Database db = new Database(); + db.setName(dbName); + client.createDatabase(db); + + String tblName = "t1"; + List<FieldSchema> columns = new ArrayList<FieldSchema>(); + columns.add(new FieldSchema("foo", "string", "")); + columns.add(new FieldSchema("bar", "string", "")); + + SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>()); + + StorageDescriptor sd + = new StorageDescriptor(columns, null, + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + false, 0, serdeInfo, null, null, null); + Map<String, String> tableParameters = new HashMap<String, String>(); + + Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, null, tableParameters, "", "", ""); + + client.createTable(tbl); + + Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1"); + createFile(filePath1, "f1"); + String fileSig1 = ReplChangeManager.getCksumString(filePath1, hiveConf); + + Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2"); + createFile(filePath2, "f2"); + String fileSig2 = ReplChangeManager.getCksumString(filePath2, hiveConf); + + Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3"); + createFile(filePath3, "f3"); + String fileSig3 = ReplChangeManager.getCksumString(filePath3, hiveConf); + + + Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); + Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); + Assert.assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3)); + + ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse); + // verify cm.recycle(Path) api moves file to cmroot dir + cm.recycle(filePath1); + Assert.assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); + + Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileSig1); + Assert.assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); + + // verify cm.recycle(db, table) api moves file to cmroot dir + int ret = cm.recycle(db, tbl); + Assert.assertEquals(ret, 0); + + Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); + Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); + + client.dropTable(dbName, tblName); + + Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileSig2); + Assert.assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); + + Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileSig3); + Assert.assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); + + client.dropDatabase(dbName, true, true); + } + + @Test + public void testClearer() throws Exception { + FileSystem fs = new Path(cmroot).getFileSystem(hiveConf); + long now = System.currentTimeMillis(); + Path dirDb = new Path(cmroot, "db3"); + fs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + fs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + fs.create(part11).close(); + fs.setTimes(part11, now - 86400*1000*2, now - 86400*1000*2); + Path part12 = new Path(dirTbl1, "part2"); + fs.create(part12).close(); + Path dirTbl2 = new Path(dirDb, "tbl2"); + fs.mkdirs(dirTbl2); + Path part21 = new Path(dirTbl2, "part1"); + fs.create(part21).close(); + fs.setTimes(part21, now - 86400*1000*2, now - 86400*1000*2); + Path part22 = new Path(dirTbl2, "part2"); + fs.create(part22).close(); + Path dirTbl3 = new Path(dirDb, "tbl3"); + fs.mkdirs(dirTbl3); + Path part31 = new Path(dirTbl3, "part1"); + fs.create(part31).close(); + fs.setTimes(part31, now - 86400*1000*2, now - 86400*1000*2); + Path part32 = new Path(dirTbl3, "part2"); + fs.create(part32).close(); + fs.setTimes(part32, now - 86400*1000*2, now - 86400*1000*2); + + ReplChangeManager.scheduleCMClearer(hiveConf); + + long start = System.currentTimeMillis(); + long end; + boolean cleared = false; + do { + Thread.sleep(200); + end = System.currentTimeMillis(); + if (end - start > 5000) { + Assert.fail("timeout, cmroot has not been cleared"); + } + if (!part11.getFileSystem(hiveConf).exists(part11) && + part12.getFileSystem(hiveConf).exists(part12) && + !part21.getFileSystem(hiveConf).exists(part21) && + part22.getFileSystem(hiveConf).exists(part22) && + !dirTbl3.getFileSystem(hiveConf).exists(dirTbl3)) { + cleared = true; + } + } while (!cleared); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 2892da3..121b825 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7238,6 +7238,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { } finally { startLock.unlock(); } + + ReplChangeManager.scheduleCMClearer(conf); } }; t.setDaemon(true); http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java new file mode 100644 index 0000000..a49a80e --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplChangeManager { + private static final Logger LOG = LoggerFactory.getLogger(ReplChangeManager.class); + static private ReplChangeManager instance; + + private static boolean inited = false; + private static boolean enabled = false; + private static Path cmroot; + private static HiveConf conf; + private static Warehouse wh; + private String user; + private String group; + + public static ReplChangeManager getInstance(HiveConf conf, Warehouse wh) throws IOException { + if (instance == null) { + instance = new ReplChangeManager(conf, wh); + } + return instance; + } + + ReplChangeManager(HiveConf conf, Warehouse wh) throws IOException { + if (!inited) { + if (conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { + ReplChangeManager.enabled = true; + ReplChangeManager.cmroot = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname)); + ReplChangeManager.conf = conf; + ReplChangeManager.wh = wh; + + FileSystem fs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!fs.exists(cmroot)) { + fs.mkdirs(cmroot); + fs.setPermission(cmroot, new FsPermission("700")); + } + UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); + user = usergroupInfo.getShortUserName(); + group = usergroupInfo.getPrimaryGroupName(); + } + inited = true; + } + } + + /*** + * Recycle a managed table, move table files to cmroot + * @param db + * @param table + * @return + * @throws IOException + * @throws MetaException + */ + public int recycle(Database db, Table table) throws IOException, MetaException { + if (!enabled) { + return 0; + } + + Path tablePath = wh.getTablePath(db, table.getTableName()); + FileSystem fs = tablePath.getFileSystem(conf); + int failCount = 0; + for (FileStatus file : fs.listStatus(tablePath)) { + if (!recycle(file.getPath())) { + failCount++; + } + } + return failCount; + } + + /*** + * Recycle a partition of a managed table, move partition files to cmroot + * @param db + * @param table + * @param part + * @return + * @throws IOException + * @throws MetaException + */ + public int recycle(Database db, Table table, Partition part) throws IOException, MetaException { + if (!enabled) { + return 0; + } + + Map<String, String> pm = Warehouse.makeSpecFromValues(table.getPartitionKeys(), part.getValues()); + Path partPath = wh.getPartitionPath(db, table.getTableName(), pm); + FileSystem fs = partPath.getFileSystem(conf); + int failCount = 0; + for (FileStatus file : fs.listStatus(partPath)) { + if (!recycle(file.getPath())) { + failCount++; + } + } + return failCount; + } + + /*** + * Recycle a single file (of a partition, or table if nonpartitioned), + * move files to cmroot. Note the table must be managed table + * @param path + * @return + * @throws IOException + * @throws MetaException + */ + public boolean recycle(Path path) throws IOException, MetaException { + if (!enabled) { + return true; + } + + Path cmPath = getCMPath(path, conf, getCksumString(path, conf)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); + } + + FileSystem fs = path.getFileSystem(conf); + + boolean succ = fs.rename(path, cmPath); + // Ignore if a file with same content already exist in cmroot + // We might want to setXAttr for the new location in the future + if (!succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore"); + } + } else { + long now = System.currentTimeMillis(); + fs.setTimes(cmPath, now, now); + + // set the file owner to hive (or the id metastore run as) + fs.setOwner(cmPath, user, group); + + // tag the original file name so we know where the file comes from + fs.setXAttr(cmPath, "user.original-loc", path.toString().getBytes()); + } + return succ; + } + + // Get checksum of a file + static public String getCksumString(Path path, Configuration conf) throws IOException { + // TODO: fs checksum only available on hdfs, need to + // find a solution for other fs (eg, local fs, s3, etc) + FileSystem fs = path.getFileSystem(conf); + FileChecksum checksum = fs.getFileChecksum(path); + String checksumString = StringUtils.byteToHexString( + checksum.getBytes(), 0, checksum.getLength()); + return checksumString; + } + + /*** + * Convert a path of file inside a partition or table (if non-partitioned) + * to a deterministic location of cmroot. So user can retrieve the file back + * with the original location plus signature. + * @param path original path inside partition or table + * @param conf + * @param signature unique signature of the file, can be retrieved by {@link getSignature} + * @return + * @throws IOException + * @throws MetaException + */ + static public Path getCMPath(Path path, Configuration conf, String signature) + throws IOException, MetaException { + String newFileName = signature + path.getName(); + int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); + + if (newFileName.length() > maxLength) { + newFileName = newFileName.substring(0, maxLength-1); + } + + Path cmPath = new Path(cmroot, newFileName); + + return cmPath; + } + + /** + * Thread to clear old files of cmroot recursively + */ + static class CMClearer implements Runnable { + private Path cmroot; + private long secRetain; + private Configuration conf; + + CMClearer(String cmrootString, long secRetain, Configuration conf) { + this.cmroot = new Path(cmrootString); + this.secRetain = secRetain; + this.conf = conf; + } + + @Override + public void run() { + try { + LOG.info("CMClearer started"); + long now = System.currentTimeMillis(); + processDir(cmroot, now); + } catch (IOException e) { + LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); + } + } + + private boolean processDir(Path folder, long now) throws IOException { + FileStatus[] files = folder.getFileSystem(conf).listStatus(folder); + boolean empty = true; + for (FileStatus file : files) { + if (file.isDirectory()) { + if (processDir(file.getPath(), now)) { + file.getPath().getFileSystem(conf).delete(file.getPath(), false); + if (LOG.isDebugEnabled()) { + LOG.debug("Remove " + file.toString()); + } + } else { + empty = false; + } + } else { + long modifiedTime = file.getModificationTime(); + if (now - modifiedTime > secRetain*1000) { + file.getPath().getFileSystem(conf).delete(file.getPath(), false); + } else { + empty = false; + } + } + } + return empty; + } + } + + // Schedule CMClearer thread. Will be invoked by metastore + public static void scheduleCMClearer(HiveConf hiveConf) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder() + .namingPattern("cmclearer-%d") + .daemon(true) + .build()); + executor.scheduleAtFixedRate(new CMClearer(hiveConf.get(HiveConf.ConfVars.REPLCMDIR.varname), + HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), hiveConf), + 0, + HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMINTERVAL, + TimeUnit.SECONDS), TimeUnit.SECONDS); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index 6aca1b7..549ca52 100755 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -448,11 +448,11 @@ public class Warehouse { } public Path getPartitionPath(Database db, String tableName, - LinkedHashMap<String, String> pm) throws MetaException { + Map<String, String> pm) throws MetaException { return new Path(getTablePath(db, tableName), makePartPath(pm)); } - public Path getPartitionPath(Path tblPath, LinkedHashMap<String, String> pm) + public Path getPartitionPath(Path tblPath, Map<String, String> pm) throws MetaException { return new Path(tblPath, makePartPath(pm)); }