Repository: hive Updated Branches: refs/heads/master f5b14fc04 -> ddf3b6cd0
http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 77fe736..5e085f8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.TestTxnCommands2; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -87,7 +88,7 @@ public class TestDbTxnManager2 { private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; - private Driver driver; + private Driver driver, driver2; private TxnStore txnHandler; public TestDbTxnManager2() throws Exception { @@ -103,6 +104,7 @@ public class TestDbTxnManager2 { SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build(), null); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build(), null); TxnDbUtil.cleanDb(conf); TxnDbUtil.prepDb(conf); SessionState ss = SessionState.get(); @@ -115,6 +117,7 @@ public class TestDbTxnManager2 { @After public void tearDown() throws Exception { driver.close(); + driver2.close(); if (txnMgr != null) { txnMgr.closeTxnManager(); } @@ -548,10 +551,10 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); @@ -561,10 +564,10 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); @@ -576,7 +579,7 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); @@ -586,7 +589,7 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); @@ -824,7 +827,7 @@ public class TestDbTxnManager2 { * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us * to write good tests) */ - private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { return SessionState.get().setTxnMgr(txnMgr); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 124c97e..cfd7290 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -359,7 +359,7 @@ public abstract class CompactorTest { } FSDataOutputStream out = fs.create(partFile); if (type == FileType.LENGTH_FILE) { - out.writeInt(numRecords); + out.writeInt(numRecords);//hmm - length files should store length in bytes... } else { for (int i = 0; i < numRecords; i++) { RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ce574b4..467851a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -18,34 +18,20 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * Tests for the compactor Cleaner thread @@ -198,231 +184,6 @@ public class TestCleaner extends CompactorTest { } @Test - public void blockedByLockTable() throws Exception { - Table t = newTable("default", "bblt", false); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblt", compacts.get(0).getTablename()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void blockedByLockPartition() throws Exception { - Table t = newTable("default", "bblp", true); - Partition p = newPartition(t, "today"); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblp", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); - comp.setTablename("bblp"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania")); - req.setTxnid(resp.getTxn_ids().get(0)); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblp", compacts.get(0).getTablename()); - Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void notBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", false); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp2.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List<LockComponent> components2 = new ArrayList<LockComponent>(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void partitionNotBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", true); - Partition p = newPartition(t, "today"); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp.setTablename("bblt"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp2.setTablename("bblt"); - comp2.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.SELECT); - List<LockComponent> components2 = new ArrayList<LockComponent>(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { Table t = newTable("default", "campcnb", true); Partition p = newPartition(t, "today"); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 488cd90..d9e4468 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -300,7 +300,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -309,7 +309,7 @@ public class TestWorker extends CompactorTest { Assert.assertEquals(104L, buckets[0].getLen()); Assert.assertEquals(104L, buckets[1].getLen()); } - if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -322,7 +322,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewDelta); + Assert.assertTrue(toString(stat), sawNewDelta); } /** @@ -354,15 +354,22 @@ public class TestWorker extends CompactorTest { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + Assert.assertEquals(toString(stat),6 , stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); + /** + * this may look a bit odd. Compactor is capped at min open write id which is 23 in this case + * so the minor compaction above only 1 dir as input, delta_21_22 and outputs + * delta_21_22_v28 (and matching delete_delta) (HIVE-9995/HIVE-20901) + */ + Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22) + "_v0000028", + stat[1].getPath().getName()); Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 22) + "_v0000028", stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); } @Test @@ -395,9 +402,9 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), stat[1].getPath().getName()); + Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", stat[1].getPath().getName()); Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", stat[3].getPath().getName()); Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); } @@ -432,7 +439,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -453,7 +460,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewDelta); + Assert.assertTrue(toString(stat), sawNewDelta); } @Test @@ -484,7 +491,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -493,7 +500,7 @@ public class TestWorker extends CompactorTest { Assert.assertEquals(104L, buckets[0].getLen()); Assert.assertEquals(104L, buckets[1].getLen()); } - if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) { + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -505,7 +512,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewDelta); + Assert.assertTrue(toString(stat), sawNewDelta); } @Test @@ -537,7 +544,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000024")) { + if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); @@ -549,7 +556,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewBase); + Assert.assertTrue(toString(stat), sawNewBase); } @Test @@ -612,68 +619,37 @@ public class TestWorker extends CompactorTest { Assert.assertEquals(numFilesExpected, stat.length); // Find the new delta file and make sure it has the right contents - BitSet matchesFound = new BitSet(numFilesExpected); - for (int i = 0, j = 0; i < stat.length; i++) { - if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) { - matchesFound.set(j++); - } - if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) { - matchesFound.set(j++); - } - if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) { - matchesFound.set(j++); - } - else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) { - matchesFound.set(j++); - } - switch (type) { - case MINOR: - if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) { - matchesFound.set(j++); - } - if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) { - matchesFound.set(j++); - } - break; - case MAJOR: - if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) { - matchesFound.set(j++); - } - break; - default: - throw new IllegalStateException(); - } + List<String> matchesNotFound = new ArrayList<>(numFilesExpected); + matchesNotFound.add(makeDeleteDeltaDirNameCompacted(21,23) + "_v\\d+"); + matchesNotFound.add(makeDeleteDeltaDirNameCompacted(25,33) + "_v\\d+"); + matchesNotFound.add(makeDeltaDirName(21,21)); + matchesNotFound.add(makeDeltaDirName(23, 23)); + matchesNotFound.add(makeDeltaDirNameCompacted(25, 29));//streaming ingest + matchesNotFound.add(makeDeltaDirNameCompacted(31, 32));//streaming ingest + //todo: this should have some _vXXXX suffix but addDeltaFile() doesn't support it + matchesNotFound.add(makeDeltaDirNameCompacted(31, 33)); + matchesNotFound.add(makeDeltaDirName(35, 35)); + matchesNotFound.add(makeDeltaDirNameCompacted(21,23) + "_v\\d+"); + matchesNotFound.add(makeDeltaDirNameCompacted(25,33) + "_v\\d+"); + if(type == CompactionType.MINOR) { + matchesNotFound.add(makeDeltaDirNameCompacted(21,35) + "_v\\d+"); + matchesNotFound.add(makeDeleteDeltaDirNameCompacted(21, 35) + "_v\\d+"); + } + if(type == CompactionType.MAJOR) { + matchesNotFound.add(AcidUtils.baseDir(35) + "_v\\d+"); } - StringBuilder sb = null; - for(int i = 0; i < stat.length; i++) { - if(!matchesFound.get(i)) { - if(sb == null) { - sb = new StringBuilder("Some files are missing at index: "); + for(FileStatus f : stat) { + for(int j = 0; j < matchesNotFound.size(); j++) { + if (f.getPath().getName().matches(matchesNotFound.get(j))) { + matchesNotFound.remove(j); + break; } - sb.append(i).append(","); } } - if (sb != null) { - Assert.assertTrue(sb.toString(), false); + if(matchesNotFound.size() == 0) { + return; } + Assert.assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), false); } @Test public void majorPartitionWithBase() throws Exception { @@ -706,7 +682,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000024")) { + if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); @@ -718,7 +694,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewBase); + Assert.assertTrue(toString(stat), sawNewBase); } @Test @@ -749,7 +725,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000004")) { + if (stat[i].getPath().getName().equals("base_0000004_v0000005")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); @@ -761,9 +737,20 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewBase); + Assert.assertTrue(toString(stat), sawNewBase); } + private static String toString(FileStatus[] stat) { + StringBuilder sb = new StringBuilder("stat{"); + if(stat == null) { + return sb.toString(); + } + for(FileStatus f : stat) { + sb.append(f.getPath()).append(","); + } + sb.setCharAt(sb.length() - 1, '}'); + return sb.toString(); + } @Test public void majorTableLegacy() throws Exception { LOG.debug("Starting majorTableLegacy"); @@ -793,7 +780,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000024")) { + if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); @@ -805,7 +792,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewBase); + Assert.assertTrue(toString(stat), sawNewBase); } @Test @@ -836,7 +823,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); @@ -846,7 +833,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewDelta); + Assert.assertTrue(toString(stat), sawNewDelta); } @Test @@ -881,7 +868,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000026")) { + if (stat[i].getPath().getName().equals("base_0000026_v0000028")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); @@ -901,7 +888,7 @@ public class TestWorker extends CompactorTest { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(sawNewBase); + Assert.assertTrue(toString(stat), sawNewBase); } @Test @@ -933,7 +920,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000022", stat[0].getPath().getName()); + Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName()); Assert.assertEquals("base_20", stat[1].getPath().getName()); Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); @@ -969,7 +956,7 @@ public class TestWorker extends CompactorTest { // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000027", stat[0].getPath().getName()); + Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName()); Assert.assertEquals("base_20", stat[1].getPath().getName()); Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java index c61a997..43cc805 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java @@ -89,7 +89,7 @@ public class TxnCommonUtils { * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to * read the files, and thus treats both open and aborted transactions as invalid. * @param currentTxnId current txn ID for which we get the valid write ids list - * @param list valid write ids list from the metastore + * @param validIds valid write ids list from the metastore * @return a valid write IDs list for the whole transaction. */ public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId, http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cbb76d5..7202cc6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hive.metastore.txn; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.util.StringUtils; @@ -324,6 +324,57 @@ class CompactionTxnHandler extends TxnHandler { return findReadyToClean(); } } + @Override + public long findMinOpenTxnId() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + return findMinOpenTxnGLB(stmt); + } catch (SQLException e) { + LOG.error("Unable to findMinOpenTxnId() due to:" + e.getMessage()); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findMinOpenTxnId"); + throw new MetaException("Unable to execute findMinOpenTxnId() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinOpenTxnId(); + } + } + + /** + * See doc at {@link TxnStore#findMinOpenTxnId()} + * Note that {@link #openTxns(OpenTxnRequest)} makes update of NEXT_TXN and MIN_HISTORY_LEVEL + * a single atomic operation (and no one else should update these tables except the cleaner + * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) + */ + private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { + String s = "select ntxn_next from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + rs.next(); + long minOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + return hwm; + } + //since generating new txnid uses select for update on single row in NEXT_TXN_ID + assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")"; + return minOpenTxnId; + } /** * This will remove an entry from the queue after @@ -387,11 +438,16 @@ class CompactionTxnHandler extends TxnHandler { pStmt.setLong(paramCount++, info.highestWriteId); } LOG.debug("Going to execute update <" + s + ">"); - if (pStmt.executeUpdate() < 1) { + if ((updCount = pStmt.executeUpdate()) < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + "marking compaction entry as clean!"); } - + /** + * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to + * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about + * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). + * See {@link ql.txn.compactor.Cleaner.removeFiles()} + */ s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; @@ -480,7 +536,6 @@ class CompactionTxnHandler extends TxnHandler { markCleaned(info); } } - /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). @@ -502,30 +557,11 @@ class CompactionTxnHandler extends TxnHandler { // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. // If there are no txns which are currently open or aborted in the system, then current value of // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. - String s = "select ntxn_next from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long minUncommittedTxnId = rs.getLong(1); - - // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table - // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next. - s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (rs.next()) { - long minOpenTxnId = rs.getLong(1); - if (minOpenTxnId > 0) { - minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId); - } - } + long minUncommittedTxnId = findMinOpenTxnGLB(stmt); // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); + String s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (rs.next()) { @@ -534,7 +570,6 @@ class CompactionTxnHandler extends TxnHandler { minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId); } } - // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 22ce007..ca47a44 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -357,6 +357,16 @@ public interface TxnStore extends Configurable { List<CompactionInfo> findReadyToClean() throws MetaException; /** + * Returns the smallest txnid that could be seen in open state across all active transactions in + * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active transactions, i.e. the + * smallest txnid that can be seen as unresolved in the whole system. Even if a transaction + * is opened concurrently with this call it cannot have an id less than what this method returns. + * @return transaction ID + */ + @RetrySemantics.ReadOnly + long findMinOpenTxnId() throws MetaException; + + /** * This will remove an entry from the queue after * it has been compacted. * http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 3bb1f0c..cd77b4e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -39,12 +41,30 @@ import java.util.Map; public class TxnUtils { private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); - // Transactional stats states - static final public char STAT_OPEN = 'o'; - static final public char STAT_INVALID = 'i'; - static final public char STAT_COMMITTED = 'c'; - static final public char STAT_OBSOLETE = 's'; - + public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxnGLB) { + long highWaterMark = minOpenTxnGLB - 1; + long[] abortedTxns = new long[txns.getOpen_txnsSize()]; + BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); + int i = 0; + for(long txnId : txns.getOpen_txns()) { + if(txnId > highWaterMark) { + break; + } + if(abortedBits.get(i)) { + abortedTxns[i] = txnId; + } + else { + assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWaterMark; + } + ++i; + } + abortedTxns = Arrays.copyOf(abortedTxns, i); + BitSet bitSet = new BitSet(abortedTxns.length); + bitSet.set(0, abortedTxns.length); + //add ValidCleanerTxnList? - could be problematic for all the places that read it from + // string as they'd have to know which object to instantiate + return new ValidReadTxnList(abortedTxns, bitSet, highWaterMark, Long.MAX_VALUE); + } /** * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to @@ -83,17 +103,6 @@ public class TxnUtils { } } - public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { - // This is based on the existing valid write ID list that was built for a select query; - // therefore we assume all the aborted txns, etc. were already accounted for. - // All we do is adjust the high watermark to only include contiguous txns. - Long minOpenWriteId = ids.getMinOpenWriteId(); - if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { - return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); - } - return ids; - } - /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index 2c9d98b..2a70ec3 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -187,63 +187,6 @@ public class TestHiveMetaStoreTxns { } @Test - public void testTxnRange() throws Exception { - ValidTxnList validTxns = client.getValidTxns(); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 3L)); - List<Long> tids = client.openTxns("me", 5).getTxn_ids(); - - HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5); - Assert.assertEquals(0, rsp.getNosuch().size()); - Assert.assertEquals(0, rsp.getAborted().size()); - - client.rollbackTxn(1L); - client.commitTxn(2L); - client.commitTxn(3L); - client.commitTxn(4L); - validTxns = client.getValidTxns(); - System.out.println("validTxns = " + validTxns); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 3L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(3L, 4L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(2L, 5L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4L, 5L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 1L)); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(5L, 10L)); - - validTxns = new ValidReadTxnList("10:5:4,5,6:"); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(4,6)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(7, 10)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(7, 11)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(3, 6)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4, 7)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1, 12)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(1, 3)); - } - - @Test public void testLocks() throws Exception { LockRequestBuilder rqstBuilder = new LockRequestBuilder(); rqstBuilder.addLockComponent(new LockComponentBuilder() http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java index a849264..5d74241 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java @@ -80,6 +80,9 @@ public class ValidCompactorWriteIdList extends ValidReaderWriteIdList { /** * Returns org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse.ALL if all write ids in * the range are resolved and RangeResponse.NONE otherwise + * Streaming ingest may create something like delta_11_20. Compactor cannot include such delta in + * compaction until all transactions that write to it terminate. (Otherwise compactor + * will produce delta that doesn't satisfy (D1 intersect D2 is empty or D1 intersect D2 = D2). */ @Override public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) { http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java index 95a0b56..bc8ac0d 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java @@ -35,6 +35,12 @@ public class ValidReaderWriteIdList implements ValidWriteIdList { private long minOpenWriteId = Long.MAX_VALUE; protected long highWatermark; + /** + * This seems like a bad c'tor. It doesn't even have a table name in it and it's used every time + * ValidWriteIdList.VALID_WRITEIDS_KEY is not found in Configuration. + * But, if anything, that would indicate a bug if was done for an acid read since it + * considers everything valid - this should not be assumed. + */ public ValidReaderWriteIdList() { this(null, new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); } http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 50433b6..2170178 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,6 +53,8 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; @@ -66,10 +69,12 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -412,13 +417,13 @@ public class TestStreaming { rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); } @Test @@ -762,17 +767,17 @@ public class TestStreaming { rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14")); - Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); Assert.assertTrue(rs.get(5), rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); - Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000")); } /** @@ -940,7 +945,7 @@ public class TestStreaming { @Deprecated private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { - ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); + ValidWriteIdList writeIds = getTransactionContext(conf); AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); @@ -973,7 +978,8 @@ public class TestStreaming { job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); AcidUtils.setAcidOperationalProperties(job, true, null); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); - job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); + job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.writeToString()); + job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY)); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(numExpectedFiles, splits.length); org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr = @@ -994,8 +1000,7 @@ public class TestStreaming { */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories(); @@ -1038,9 +1043,15 @@ public class TestStreaming { conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled); } + private ValidWriteIdList getTransactionContext(Configuration conf) throws Exception { + ValidTxnList validTxnList = msClient.getValidTxns(); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections + .singletonList(TableName.getDbTable(dbName, tblName)), validTxnList.writeToString()); + return TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); + } private void checkNothingWritten(Path partitionPath) throws Exception { - ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories(); @@ -1937,8 +1948,7 @@ public class TestStreaming { /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, - msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); + AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1963,7 +1973,7 @@ public class TestStreaming { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); + dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());