HIVE-10595 Dropping a table can cause NPEs in the compactor (Alan Gates, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c156b32b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c156b32b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c156b32b Branch: refs/heads/beeline-cli Commit: c156b32b49aeb5943e45a68fc7600c9244afb128 Parents: 72088ca Author: Alan Gates <ga...@hortonworks.com> Authored: Thu May 7 12:49:21 2015 +0100 Committer: Alan Gates <ga...@hortonworks.com> Committed: Thu May 7 12:49:21 2015 +0100 ---------------------------------------------------------------------- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 20 ++++++- .../hive/ql/txn/compactor/CompactorThread.java | 12 ++-- .../hadoop/hive/ql/txn/compactor/Initiator.java | 11 +++- .../hadoop/hive/ql/txn/compactor/Worker.java | 12 ++++ .../hive/ql/txn/compactor/TestCleaner.java | 56 ++++++++++++++++- .../hive/ql/txn/compactor/TestInitiator.java | 63 +++++++++++++++++++- .../hive/ql/txn/compactor/TestWorker.java | 45 ++++++++++++++ 7 files changed, 207 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 83b0d3d..16d2c81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -26,10 +26,12 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -183,7 +185,23 @@ public class Cleaner extends CompactorThread { private void clean(CompactionInfo ci) throws MetaException { LOG.info("Starting cleaning for " + ci.getFullPartitionName()); try { - StorageDescriptor sd = resolveStorageDescriptor(resolveTable(ci), resolvePartition(ci)); + Table t = resolveTable(ci); + if (t == null) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped"); + return; + } + Partition p = null; + if (ci.partName != null) { + p = resolvePartition(ci); + if (p == null) { + // The partition was dropped before we got around to cleaning it. + LOG.info("Unable to find partition " + ci.getFullPartitionName() + + ", assuming it was dropped"); + return; + } + } + StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 7d097fd..38cd95e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -32,13 +32,13 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -105,13 +105,15 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { * one partition. */ protected Partition resolvePartition(CompactionInfo ci) throws Exception { - Partition p = null; if (ci.partName != null) { - List<String> names = new ArrayList<String>(1); - names.add(ci.partName); List<Partition> parts = null; try { - parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, names); + parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, + Collections.singletonList(ci.partName)); + if (parts == null || parts.size() == 0) { + // The partition got dropped before we went looking for it. + return null; + } } catch (Exception e) { LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage()); throw e; http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index f706ac1..847d751 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -85,13 +85,13 @@ public class Initiator extends CompactorThread { LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { - LOG.debug("Checking to see if we should compact " + ci.getFullPartitionName()); + LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { Table t = resolveTable(ci); if (t == null) { // Most likely this means it's a temp table - LOG.debug("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table and moving on."); + LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + + "table or has been dropped and moving on."); continue; } @@ -121,6 +121,11 @@ public class Initiator extends CompactorThread { // Figure out who we should run the file operations as Partition p = resolvePartition(ci); + if (p == null && ci.partName != null) { + LOG.info("Can't find partition " + ci.getFullPartitionName() + + ", assuming it has been dropped and moving on."); + continue; + } StorageDescriptor sd = resolveStorageDescriptor(t, p); String runAs = findUserToRunAs(sd.getLocation(), t); http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 3ce9ffd..f26225a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -94,6 +94,12 @@ public class Worker extends CompactorThread { Table t1 = null; try { t1 = resolveTable(ci); + if (t1 == null) { + LOG.info("Unable to find table " + ci.getFullTableName() + + ", assuming it was dropped and moving on."); + txnHandler.markCleaned(ci); + continue; + } } catch (MetaException e) { txnHandler.markCleaned(ci); continue; @@ -106,6 +112,12 @@ public class Worker extends CompactorThread { Partition p = null; try { p = resolvePartition(ci); + if (p == null && ci.partName != null) { + LOG.info("Unable to find partition " + ci.getFullPartitionName() + + ", assuming it was dropped and moving on."); + txnHandler.markCleaned(ci); + continue; + } } catch (Exception e) { txnHandler.markCleaned(ci); continue; http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 7687851..ffdbb9a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -17,17 +17,17 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -428,4 +428,56 @@ public class TestCleaner extends CompactorTest { Assert.assertEquals(1, paths.size()); Assert.assertEquals("base_25", paths.get(0).getName()); } + + @Test + public void droppedTable() throws Exception { + Table t = newTable("default", "dt", false); + + addDeltaFile(t, null, 1L, 22L, 22); + addDeltaFile(t, null, 23L, 24L, 2); + addBaseFile(t, null, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "dt", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + ms.dropTable("default", "dt"); + + startCleaner(); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(0, rsp.getCompactsSize()); + } + + @Test + public void droppedPartition() throws Exception { + Table t = newTable("default", "dp", true); + Partition p = newPartition(t, "today"); + + addDeltaFile(t, p, 1L, 22L, 22); + addDeltaFile(t, p, 23L, 24L, 2); + addBaseFile(t, p, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "dp", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + ms.dropPartition("default", "dp", Collections.singletonList("today"), true); + + startCleaner(); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(0, rsp.getCompactsSize()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 1a9cbca..00b13de 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -653,4 +654,64 @@ public class TestInitiator extends CompactorTest { Assert.assertEquals(0, compacts.size()); } + @Test + public void dropTable() throws Exception { + Table t = newTable("default", "dt", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("dt"); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + ms.dropTable("default", "dt"); + + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + @Test + public void dropPartition() throws Exception { + Table t = newTable("default", "dp", true); + Partition p = newPartition(t, "today"); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("dp"); + comp.setPartitionname("ds=today"); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + ms.dropPartition("default", "dp", Collections.singletonList("today"), true); + + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/c156b32b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 78a7f9e..bebac54 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -29,6 +29,7 @@ import org.junit.Test; import java.io.*; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -799,4 +800,48 @@ public class TestWorker extends CompactorTest { Assert.assertEquals("delta_23_25", stat[3].getPath().getName()); Assert.assertEquals("delta_26_27", stat[4].getPath().getName()); } + + @Test + public void droppedTable() throws Exception { + Table t = newTable("default", "dt", false); + + addDeltaFile(t, null, 1L, 2L, 2); + addDeltaFile(t, null, 3L, 4L, 2); + burnThroughTransactions(4); + + CompactionRequest rqst = new CompactionRequest("default", "dt", CompactionType.MAJOR); + txnHandler.compact(rqst); + + ms.dropTable("default", "dt"); + + startWorker(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + @Test + public void droppedPartition() throws Exception { + Table t = newTable("default", "dp", true); + Partition p = newPartition(t, "today"); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "dp", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + ms.dropPartition("default", "dp", Collections.singletonList("today"), true); + + startWorker(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } }