Repository: hive Updated Branches: refs/heads/branch-3 cc0ef469c -> ea18769f0 refs/heads/master fc425933e -> 0dec5952a
HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0dec5952 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0dec5952 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0dec5952 Branch: refs/heads/master Commit: 0dec5952aaacefb711d69e0e40f8da389c073d5a Parents: fc42593 Author: sergey <ser...@apache.org> Authored: Thu Apr 26 17:19:33 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Thu Apr 26 17:19:33 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../hive/ql/txn/compactor/TestCompactor.java | 261 ++++++++++++-- .../java/org/apache/hadoop/hive/ql/Driver.java | 28 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 3 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 19 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 2 - .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +- .../apache/hadoop/hive/ql/plan/PlanUtils.java | 3 + .../hive/ql/txn/compactor/CompactorMR.java | 337 ++++++++++++++++++- .../hadoop/hive/ql/txn/compactor/Initiator.java | 25 +- .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +- .../hive/ql/TestTxnCommandsForMmTable.java | 73 +--- .../metastore/api/hive_metastoreConstants.java | 7 +- .../hive/metastore/HiveMetaStoreClient.java | 6 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 4 +- .../hadoop/hive/metastore/txn/TxnUtils.java | 20 +- .../HiveMetaStoreClientPreCatalog.java | 4 +- .../hive/common/ValidReaderWriteIdList.java | 4 + 18 files changed, 654 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f40c606..049a594 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2531,6 +2531,9 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), + + HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, + "Whether the compactor should compact insert-only tables. A safety switch."), /** * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED */ http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 82ba775..4ebd096 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; @@ -30,11 +31,13 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,9 +48,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -704,16 +709,7 @@ public class TestCompactor { // it has an open txn in it writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -827,16 +823,7 @@ public class TestCompactor { txnBatch.abort(); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -860,6 +847,228 @@ public class TestCompactor { } } + + @Test + public void mmTable() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS ORC" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + // Check that we have two deltas. + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(table.getSd(), fs, 2); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + // Make sure we don't compact if we don't need to compact. + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000002"); + } + + @Test + public void mmTableBucketed() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) CLUSTERED BY (a) " + + "INTO 64 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', " + + "'transactional_properties'='insert_only')", driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + // Check that we have two deltas. + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(table.getSd(), fs, 2); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + String baseDir = "base_0000002"; + verifyHasBase(table.getSd(), fs, baseDir); + + FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir), + AcidUtils.hiddenFileFilter); + Assert.assertEquals(Lists.newArrayList(files).toString(), 64, files.length); + } + + @Test + public void mmTableOpenWriteId() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + long openTxnId = msClient.openTxn("test"); + long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName); + Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new. + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 2); + + runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened. + FileSystem fs = FileSystem.get(conf); + verifyHasBase(table.getSd(), fs, "base_0000002"); + verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter); + verifyFooBarResult(tblName, 2); + + runCleaner(conf); + verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter); + verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter); + verifyFooBarResult(tblName, 2); + + msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3. + runMajorCompaction(dbName, tblName); // Compact 4 and 5. + verifyFooBarResult(tblName, 2); + verifyHasBase(table.getSd(), fs, "base_0000005"); + runCleaner(conf); + verifyDeltaCount(table.getSd(), fs, 0); + } + + private void verifyHasBase( + StorageDescriptor sd, FileSystem fs, String baseName) throws Exception { + verifyHasDir(sd, fs, baseName, AcidUtils.baseFileFilter); + } + + private void verifyHasDir( + StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter); + for (FileStatus file : stat) { + if (name.equals(file.getPath().getName())) return; + } + Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat)); + } + + private void verifyDeltaCount( + StorageDescriptor sd, FileSystem fs, int count) throws Exception { + verifyDirCount(sd, fs, count, AcidUtils.deltaFileFilter); + } + + private void verifyDirCount( + StorageDescriptor sd, FileSystem fs, int count, PathFilter filter) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter); + Assert.assertEquals(Arrays.toString(stat), count, stat.length); + } + + @Test + public void mmTablePartitioned() throws Exception { + String dbName = "default"; + String tblName = "mm_part"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds int) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver); + + verifyFooBarResult(tblName, 3); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"), + p2 = msClient.getPartition(dbName, tblName, "ds=2"), + p3 = msClient.getPartition(dbName, tblName, "ds=3"); + msClient.close(); + + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(p1.getSd(), fs, 3); + verifyDeltaCount(p2.getSd(), fs, 2); + verifyDeltaCount(p3.getSd(), fs, 1); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + verifyFooBarResult(tblName, 3); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000006"); + verifyHasBase(p2.getSd(), fs, "base_0000006"); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + // Make sure we don't compact if we don't need to compact; but do if we do. + verifyFooBarResult(tblName, 4); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000006"); + verifyHasBase(p2.getSd(), fs, "base_0000008"); + + } + + private void verifyFooBarResult(String tblName, int count) throws Exception, IOException { + List<String> valuesReadFromHiveDriver = new ArrayList<String>(); + executeStatementOnDriver("SELECT a,b FROM " + tblName, driver); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size()); + int fooCount = 0, barCount = 0; + for (String s : valuesReadFromHiveDriver) { + if ("1\tfoo".equals(s)) { + ++fooCount; + } else if ("2\tbar".equals(s)) { + ++barCount; + } else { + Assert.fail("Unexpected " + s); + } + } + Assert.assertEquals(fooCount, count); + Assert.assertEquals(barCount, count); + } + + private void runMajorCompaction( + String dbName, String tblName, String... partNames) throws MetaException { + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(conf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + if (partNames.length == 0) { + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + t.run(); + } else { + for (String partName : partNames) { + CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + cr.setPartitionname(partName); + txnHandler.compact(cr); + t.run(); + } + } + } + @Test public void majorCompactWhileStreamingForSplitUpdate() throws Exception { String dbName = "default"; @@ -885,16 +1094,7 @@ public class TestCompactor { // Start a third batch, but don't close it. writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -1461,6 +1661,7 @@ public class TestCompactor { throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr); } } + static void createTestDataFile(String filename, String[] lines) throws IOException { FileWriter writer = null; try { http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f83bdaf..41ad002 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -46,6 +46,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -61,6 +63,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; @@ -209,6 +213,7 @@ public class Driver implements IDriver { private CacheUsage cacheUsage; private CacheEntry usedCacheEntry; + private ValidWriteIdList compactionWriteIds = null; private enum DriverState { INITIALIZED, @@ -540,8 +545,10 @@ public class Driver implements IDriver { conf.setQueryString(queryStr); // FIXME: sideeffect will leave the last query set at the session level - SessionState.get().getConf().setQueryString(queryStr); - SessionState.get().setupQueryCurrentTimestamp(); + if (SessionState.get() != null) { + SessionState.get().getConf().setQueryString(queryStr); + SessionState.get().setupQueryCurrentTimestamp(); + } // Whether any error occurred during query compilation. Used for query lifetime hook. boolean compileError = false; @@ -1311,7 +1318,18 @@ public class Driver implements IDriver { throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } - ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString); + List<String> txnTables = getTransactionalTableList(plan); + ValidTxnWriteIdList txnWriteIds = null; + if (compactionWriteIds != null) { + if (txnTables.size() != 1) { + throw new LockException("Unexpected tables in compaction: " + txnTables); + } + String fullTableName = txnTables.get(0); + txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now. + txnWriteIds.addTableValidWriteIdList(compactionWriteIds); + } else { + txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); + } String writeIdStr = txnWriteIds.toString(); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); if (plan.getFetchTask() != null) { @@ -2776,4 +2794,8 @@ public class Driver implements IDriver { return false; } } + + public void setCompactionWriteIds(ValidWriteIdList val) { + this.compactionWriteIds = val; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 15e6c34..3141a7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2724,7 +2724,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return prop_string; } - private StringBuilder appendSerdeParams(StringBuilder builder, Map<String, String> serdeParam) { + public static StringBuilder appendSerdeParams( + StringBuilder builder, Map<String, String> serdeParam) { serdeParam = new TreeMap<String, String>(serdeParam); builder.append("WITH SERDEPROPERTIES ( \n"); List<String> serdeCols = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 7b7fd5d..76569d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.lockmgr; import com.google.common.annotations.VisibleForTesting; + +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -31,12 +33,14 @@ import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -69,7 +73,7 @@ import java.util.concurrent.atomic.AtomicReference; * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method. * The later may (usually will) be called from a timer thread. * See {@link #getMS()} for more important concurrency/metastore access notes. - * + * * Each statement that the TM (transaction manager) should be aware of should belong to a transaction. * Effectively, that means any statement that has side effects. Exceptions are statements like * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either @@ -111,9 +115,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl { /** * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot * include any Operations which cannot be rolled back (drop partition; write to non-acid table). - * If false, it's a single statement transaction which can include any statement. This is not a + * If false, it's a single statement transaction which can include any statement. This is not a * contradiction from the user point of view who doesn't know anything about the implicit txn - * and cannot call rollback (the statement of course can fail in which case there is nothing to + * and cannot call rollback (the statement of course can fail in which case there is nothing to * rollback (assuming the statement is well implemented)). * * This is done so that all commands run in a transaction which simplifies implementation and @@ -292,7 +296,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl { /** * Ensures that the current SQL statement is appropriate for the current state of the * Transaction Manager (e.g. can call commit unless you called start transaction) - * + * * Note that support for multi-statement txns is a work-in-progress so it's only supported in * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST. * @param queryPlan @@ -300,7 +304,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl { */ private void verifyState(QueryPlan queryPlan) throws LockException { if(!isTxnOpen()) { - throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + + throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + " for " + getQueryIdWaterMark(queryPlan)); } if(queryPlan.getOperation() == null) { @@ -820,7 +824,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { assert isTxnOpen(); assert validTxnList != null && !validTxnList.isEmpty(); try { - return getMS().getValidWriteIds(txnId, tableList, validTxnList); + return TxnUtils.createValidTxnWriteIdList( + txnId, getMS().getValidWriteIds(tableList, validTxnList)); } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } @@ -1013,7 +1018,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } } - + public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy, List<TxnToWriteId> srcTxnToWriteIdList) throws LockException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 78eedd3..a74670b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -394,6 +394,4 @@ class DummyTxnManager extends HiveTxnManagerImpl { } return locks; } - - } http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index ec11fec..f239535 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; + import java.util.List; /** @@ -253,7 +254,7 @@ public interface HiveTxnManager { boolean recordSnapshot(QueryPlan queryPlan); boolean isImplicitTransactionOpen(); - + boolean isTxnOpen(); /** * if {@code isTxnOpen()}, returns the currently active transaction ID. http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index dde20ed..056dfa4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -110,6 +110,9 @@ public final class PlanUtils { public static TableDesc getDefaultTableDesc(CreateTableDesc directoryDesc, String cols, String colTypes ) { + // TODO: this should have an option for directory to inherit from the parent table, + // including bucketing and list bucketing, for the use in compaction when the + // latter runs inside a transaction. TableDesc ret = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols, colTypes, false);; if (directoryDesc == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b1c2288..b698c84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashSet; + +import com.google.common.collect.Lists; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; @@ -33,24 +41,43 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -70,7 +97,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +157,7 @@ public class CompactorMR { } job.set(FINAL_LOCATION, sd.getLocation()); - job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString()); + job.set(TMP_LOCATION, generateTmpPath(sd)); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); job.setBoolean(IS_COMPRESSED, sd.isCompressed()); @@ -200,19 +229,17 @@ public class CompactorMR { * @param ci CompactionInfo * @throws java.io.IOException if the job fails */ - void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds, + void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } - // For MM tables we don't need to launch MR jobs as there is no compaction needed. - // We just need to delete the directories for aborted transactions. if (AcidUtils.isInsertOnlyTable(t.getParameters())) { - LOG.debug("Going to delete directories for aborted transactions for MM table " - + t.getDbName() + "." + t.getTableName()); - removeFiles(conf, sd.getLocation(), writeIds, t); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { + runMmCompaction(conf, t, p, sd, writeIds, ci); + } return; } @@ -294,6 +321,255 @@ public class CompactorMR { su.gatherStats(); } + private void runMmCompaction(HiveConf conf, Table t, Partition p, + StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { + LOG.debug("Going to delete directories for aborted transactions for MM table " + + t.getDbName() + "." + t.getTableName()); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), + conf, writeIds, Ref.from(false), false, t.getParameters()); + removeFilesForMmTable(conf, dir); + + // Then, actually do the compaction. + if (!ci.isMajorCompaction()) { + // Not supported for MM tables right now. + LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); + return; + } + + int deltaCount = dir.getCurrentDirectories().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + LOG.debug("Not compacting " + sd.getLocation() + "; current base is " + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + return; + } + try { + String tmpLocation = generateTmpPath(sd); + Path baseLocation = new Path(tmpLocation, "_base"); + + // Set up the session for driver. + conf = new HiveConf(conf); + conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = setUpSessionState(conf, user); + + // Note: we could skip creating the table and just add table type stuff directly to the + // "insert overwrite directory" command if there were no bucketing or list bucketing. + String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName; + while (true) { + tmpTableName = tmpPrefix + System.currentTimeMillis(); + String query = buildMmCompactionCtQuery(tmpTableName, t, + p == null ? t.getSd() : p.getSd(), baseLocation.toString()); + LOG.info("Compacting a MM table into " + query); + try { + runOnDriver(conf, user, sessionState, query, null); + break; + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + } + + String query = buildMmCompactionQuery(conf, t, p, tmpTableName); + LOG.info("Compacting a MM table via " + query); + runOnDriver(conf, user, sessionState, query, writeIds); + commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds); + runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null); + } catch (HiveException e) { + LOG.error("Error compacting a MM table", e); + throw new IOException(e); + } + } + + public SessionState setUpSessionState(HiveConf conf, String user) { + SessionState sessionState = SessionState.get(); + if (sessionState == null) { + // Note: we assume that workers run on the same threads repeatedly, so we can set up + // the session here and it will be reused without explicitly storing in the worker. + sessionState = new SessionState(conf, user); + SessionState.setCurrentSessionState(sessionState); + } + return sessionState; + } + + private String generateTmpPath(StorageDescriptor sd) { + return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); + } + + private String buildMmCompactionCtQuery( + String fullName, Table t, StorageDescriptor sd, String location) { + StringBuilder query = new StringBuilder("create temporary table ") + .append(fullName).append("("); + List<FieldSchema> cols = t.getSd().getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(col.getType()); + } + query.append(") "); + + // Bucketing. + List<String> buckCols = t.getSd().getBucketCols(); + if (buckCols.size() > 0) { + query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + List<Order> sortCols = t.getSd().getSortCols(); + if (sortCols.size() > 0) { + query.append("SORTED BY ("); + List<String> sortKeys = new ArrayList<String>(); + isFirst = true; + for (Order sortCol : sortCols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append(sortCol.getCol()).append(" "); + if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) { + query.append("ASC"); + } else if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) { + query.append("DESC"); + } + } + query.append(") "); + } + query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS"); + } + + // Stored as directories. We don't care about the skew otherwise. + if (t.getSd().isStoredAsSubDirectories()) { + SkewedInfo skewedInfo = t.getSd().getSkewedInfo(); + if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { + query.append(" SKEWED BY (").append( + StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + isFirst = true; + for (List<String> colValues : skewedInfo.getSkewedColValues()) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("('").append(StringUtils.join("','", colValues)).append("')"); + } + query.append(") STORED AS DIRECTORIES"); + } + } + + SerDeInfo serdeInfo = sd.getSerdeInfo(); + Map<String, String> serdeParams = serdeInfo.getParameters(); + query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand( + serdeInfo.getSerializationLib())).append("'"); + String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + assert sh == null; // Not supposed to be a compactable table. + if (!serdeParams.isEmpty()) { + DDLTask.appendSerdeParams(query, serdeParams); + } + query.append("STORED AS INPUTFORMAT '").append( + HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append( + HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append( + HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + // Exclude all standard table properties. + Set<String> excludes = getHiveMetastoreConstants(); + excludes.addAll(Lists.newArrayList(StatsSetupConst.TABLE_PARAMS_STATS_KEYS)); + isFirst = true; + for (Map.Entry<String, String> e : t.getParameters().entrySet()) { + if (e.getValue() == null) continue; + if (excludes.contains(e.getKey())) continue; + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("'").append(e.getKey()).append("'='").append( + HiveStringUtils.escapeHiveCommand(e.getValue())).append("'"); + } + if (!isFirst) { + query.append(", "); + } + query.append("'transactional'='false')"); + return query.toString(); + + } + + private static Set<String> getHiveMetastoreConstants() { + HashSet<String> result = new HashSet<>(); + for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { + if (!Modifier.isStatic(f.getModifiers())) continue; + if (!Modifier.isFinal(f.getModifiers())) continue; + if (!String.class.equals(f.getType())) continue; + f.setAccessible(true); + try { + result.add((String)f.get(null)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return result; + } + + private void runOnDriver(HiveConf conf, String user, + SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException { + boolean isOk = false; + try { + QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build(); + Driver driver = new Driver(qs, user, null, null); + driver.setCompactionWriteIds(writeIds); + try { + CommandProcessorResponse cpr = driver.run(query); + if (cpr.getResponseCode() != 0) { + LOG.error("Failed to run " + query, cpr.getException()); + throw new HiveException("Failed to run " + query, cpr.getException()); + } + } finally { + driver.close(); + driver.destroy(); + } + isOk = true; + } finally { + if (!isOk) { + try { + sessionState.close(); // This also resets SessionState.get. + } catch (Throwable th) { + LOG.warn("Failed to close a bad session", th); + SessionState.detachSession(); + } + } + } + } + + private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + // TODO: ideally we should make a special form of insert overwrite so that we: + // 1) Could use fast merge path for ORC and RC. + // 2) Didn't have to create a table. + + String query = "insert overwrite table " + tmpName + " "; + String filter = ""; + if (p != null) { + filter = " where "; + List<String> vals = p.getValues(); + List<FieldSchema> keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); + } + query += " select "; + // Use table descriptor for columns. + List<FieldSchema> cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`"); + } + } else { + query += "select *"; + } + query += " from " + fullName + filter; + return query; + } + /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files @@ -309,6 +585,10 @@ public class CompactorMR { dirsToSearch = new StringableList(); } StringableList deltaDirs = new StringableList(); + // Note: if compaction creates a delta, it won't replace an existing base dir, so the txn ID + // of the base dir won't be a part of delta's range. If otoh compaction creates a base, + // we don't care about this value because bases don't have min txn ID in the name. + // However logically this should also take base into account if it's included. long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { @@ -356,7 +636,7 @@ public class CompactorMR { * to use. * @param job the job to update * @param cols the columns of the table - * @param map + * @param map */ private void setColumnTypes(JobConf job, List<FieldSchema> cols) { StringBuilder colNames = new StringBuilder(); @@ -378,10 +658,7 @@ public class CompactorMR { } // Remove the directories for aborted transactions only - private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t) - throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList, - Ref.from(false), false, t.getParameters()); + private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { // For MM table, we only want to delete delta dirs for aborted txns. List<FileStatus> abortedDirs = dir.getAbortedDirectories(); List<Path> filesToDelete = new ArrayList<>(abortedDirs.size()); @@ -389,11 +666,9 @@ public class CompactorMR { filesToDelete.add(stat.getPath()); } if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the worker for directory " + location + - ", that hardly seems right."); return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location); + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); @@ -940,6 +1215,7 @@ public class CompactorMR { FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day + // TODO: if we expect one dir why don't we enforce it? for (FileStatus fileStatus : contents) { //newPath is the base/delta dir Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); @@ -969,4 +1245,35 @@ public class CompactorMR { fs.delete(tmpLocation, true); } } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param from The temp directory used for compactor output. Not the actual base/delta. + * @param to The final directory; basically a SD directory. Not the actual base/delta. + */ + private void commitMmCompaction(String from, String to, Configuration conf, + ValidWriteIdList actualWriteIds) throws IOException { + Path fromPath = new Path(from), toPath = new Path(to); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + fs.mkdirs(newBaseDir); + AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs); + return; + } + LOG.info("Moving contents of " + from + " to " + to); + FileStatus[] children = fs.listStatus(fromPath); + if (children.length != 1) { + throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); + } + FileStatus dirPath = children[0]; + fs.rename(dirPath.getPath(), newBaseDir); + AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs); + fs.delete(fromPath, true); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index c95daaf..a61b6e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -240,11 +240,6 @@ public class Initiator extends CompactorThread { return CompactionType.MAJOR; } - // If it is for insert-only transactional table, return null. - if (AcidUtils.isInsertOnlyTable(tblproperties)) { - return null; - } - if (runJobAsSelf(runAs)) { return determineCompactionType(ci, writeIds, sd, tblproperties); } else { @@ -333,14 +328,20 @@ public class Initiator extends CompactorThread { HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp); boolean enough = deltas.size() > deltaNumThreshold; - if (enough) { - LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + - (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + - " compaction"); - // If there's no base file, do a major compaction - return noBase ? CompactionType.MAJOR : CompactionType.MINOR; + if (!enough) { + return null; + } + if (AcidUtils.isInsertOnlyTable(tblproperties)) { + LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size() + + " delta files, threshold is " + deltaNumThreshold); + return CompactionType.MAJOR; } - return null; + // TODO: this log statement looks wrong + LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + + (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + + " compaction"); + // If there's no base file, do a major compaction + return noBase ? CompactionType.MAJOR : CompactionType.MINOR; } private long sumDirSize(FileSystem fs, Path dir) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index fe0aaa4..7461299 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -84,6 +84,7 @@ public class Worker extends CompactorThread { // so wrap it in a big catch Throwable statement. try { final CompactionInfo ci = txnHandler.findNextToCompact(name); + LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { try { @@ -170,14 +171,15 @@ public class Worker extends CompactorThread { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); + final Partition fp = p; ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler); return null; } }); http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index c053860..f357275 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -200,32 +201,7 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); } - // 2. Perform a major compaction. - runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'"); - runWorker(hiveConf); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 delta dirs. - Assert.assertEquals(2, status.length); - boolean sawBase = false; - int deltaCount = 0; - for (int i = 0; i < status.length; i++) { - String dirName = status[i].getPath().getName(); - if (dirName.matches("delta_.*")) { - deltaCount++; - } else { - sawBase = true; - Assert.assertTrue(dirName.matches("base_.*")); - } - } - Assert.assertEquals(2, deltaCount); - Assert.assertFalse(sawBase); - // Verify query result - int [][] resultData = new int[][] {{1,2},{3,4}}; - List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(resultData), rs); - - // 3. INSERT OVERWRITE + // 2. INSERT OVERWRITE // Prepare data for the source table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); // Insert overwrite MM table from source table @@ -235,40 +211,12 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { // There should be 2 delta dirs, plus 1 base dir in the location Assert.assertEquals(3, status.length); int baseCount = 0; - deltaCount = 0; - for (int i = 0; i < status.length; i++) { - String dirName = status[i].getPath().getName(); - if (dirName.matches("delta_.*")) { - deltaCount++; - } else { - baseCount++; - } - } - Assert.assertEquals(2, deltaCount); - Assert.assertEquals(1, baseCount); - - // Verify query result - resultData = new int[][] {{5,6},{7,8}}; - rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(resultData), rs); - - // 4. Perform a minor compaction. Nothing should change. - // Both deltas and the base dir should have the same name. - // Re-verify directory layout and query result by using the same logic as above - runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); - runWorker(hiveConf); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 delta dirs, plus 1 base dir in the location - Assert.assertEquals(3, status.length); - baseCount = 0; - deltaCount = 0; + int deltaCount = 0; for (int i = 0; i < status.length; i++) { String dirName = status[i].getPath().getName(); if (dirName.matches("delta_.*")) { deltaCount++; } else { - Assert.assertTrue(dirName.matches("base_.*")); baseCount++; } } @@ -276,19 +224,8 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { Assert.assertEquals(1, baseCount); // Verify query result - rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(resultData), rs); - - // 5. Run Cleaner. It should remove the 2 delta dirs. - runCleaner(hiveConf); - // There should be only 1 directory left: base_xxxxxxx. - // The delta dirs should have been cleaned up. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, status.length); - Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); - // Verify query result - rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + int[][] resultData = new int[][] {{5,6},{7,8}}; + List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); Assert.assertEquals(stringifyValues(resultData), rs); } http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index cb1d40a..2b35e6f 100644 --- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -6,10 +6,11 @@ */ package org.apache.hadoop.hive.metastore.api; +import java.lang.reflect.Modifier; +import java.lang.reflect.Field; import org.apache.thrift.scheme.IScheme; import org.apache.thrift.scheme.SchemeFactory; import org.apache.thrift.scheme.StandardScheme; - import org.apache.thrift.scheme.TupleScheme; import org.apache.thrift.protocol.TTupleProtocol; import org.apache.thrift.protocol.TProtocolException; @@ -34,7 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@org.apache.hadoop.classification.InterfaceAudience.Public @org.apache.hadoop.classification.InterfaceStability.Stable public class hive_metastoreConstants { +@org.apache.hadoop.classification.InterfaceAudience.Public +@org.apache.hadoop.classification.InterfaceStability.Stable +public class hive_metastoreConstants { public static final String DDL_TIME = "transient_lastDdlTime"; http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 1c8d223..1138ed3 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2403,10 +2403,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override - public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList) - throws TException { + public List<TableValidWriteIds> getValidWriteIds( + List<String> tablesList, String validTxnList) throws TException { GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList); - return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst)); + return client.get_valid_write_ids(rqst).getTblValidWriteIds(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index aee416d..72b814d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; @@ -2749,14 +2750,13 @@ public interface IMetaStoreClient { /** * Get a structure that details valid write ids list for all tables read by current txn. - * @param currentTxnId current txn ID for which we try to get valid write ids list * @param tablesList list of tables (format: <db_name>.<table_name>) read from the current transaction * for which needs to populate the valid write ids * @param validTxnList snapshot of valid txns for the current txn * @return list of valid write ids for the given list of tables. * @throws TException */ - ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList) + List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList) throws TException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 7b02865..1880d44 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; @@ -80,13 +81,13 @@ public class TxnUtils { * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to * read the files, and thus treats both open and aborted transactions as invalid. * @param currentTxnId current txn ID for which we get the valid write ids list - * @param validWriteIds valid write ids list from the metastore + * @param list valid write ids list from the metastore * @return a valid write IDs list for the whole transaction. */ public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId, - GetValidWriteIdsResponse validWriteIds) { + List<TableValidWriteIds> validIds) { ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId); - for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) { + for (TableValidWriteIds tableWriteIds : validIds) { validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds)); } return validTxnWriteIdList; @@ -155,6 +156,17 @@ public class TxnUtils { } } + public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { + // This is based on the existing valid write ID list that was built for a select query; + // therefore we assume all the aborted txns, etc. were already accounted for. + // All we do is adjust the high watermark to only include contiguous txns. + Long minOpenWriteId = ids.getMinOpenWriteId(); + if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { + return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); + } + return ids; + } + /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration @@ -212,7 +224,7 @@ public class TxnUtils { /** - * Build a query (or queries if one query is too big but only for the case of 'IN' + * Build a query (or queries if one query is too big but only for the case of 'IN' * composite clause. For the case of 'NOT IN' clauses, multiple queries change * the semantics of the intended query. * E.g., Let's assume that input "inList" parameter has [5, 6] and that http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index bf87cfc..8ae899f 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2140,10 +2140,10 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos } @Override - public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList) + public List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList) throws TException { GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList); - return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst)); + return client.get_valid_write_ids(rqst).getTblValidWriteIds(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java index 107ea90..95a0b56 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java @@ -250,5 +250,9 @@ public class ValidReaderWriteIdList implements ValidWriteIdList { return RangeResponse.SOME; } } + + public ValidReaderWriteIdList updateHighWatermark(long value) { + return new ValidReaderWriteIdList(tableName, exceptions, abortedBits, value, minOpenWriteId); + } }