Repository: hive Updated Branches: refs/heads/branch-3 656af1411 -> cfdb272c7
HIVE-19260 - Streaming Ingest API doesn't normalize db.table names (Eugene Koifman, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfdb272c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfdb272c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfdb272c Branch: refs/heads/branch-3 Commit: cfdb272c79bbdfa8b3c0c6481ec057cd91d53ad7 Parents: 656af14 Author: Eugene Koifman <ekoif...@apache.org> Authored: Wed Apr 25 12:02:19 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Wed Apr 25 12:02:19 2018 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/HiveEndPoint.java | 4 +- .../hive/hcatalog/streaming/TestStreaming.java | 11 +- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 119 +++++++++++++++++-- .../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 22 ++-- .../apache/hive/streaming/TestStreaming.java | 9 +- 6 files changed, 139 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 6d248ea..8582e9a 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -88,13 +88,13 @@ public class HiveEndPoint { if (database==null) { throw new IllegalArgumentException("Database cannot be null for HiveEndPoint"); } - this.database = database; - this.table = table; + this.database = database.toLowerCase(); if (table==null) { throw new IllegalArgumentException("Table cannot be null for HiveEndPoint"); } this.partitionVals = partitionVals==null ? new ArrayList<String>() : new ArrayList<String>( partitionVals ); + this.table = table.toLowerCase(); } http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 3733e3d..fe2b1c1 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -353,10 +355,10 @@ public class TestStreaming { //todo: why does it need transactional_properties? queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')"); queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')"); - List<String> rs = queryTable(driver, "select * from default.streamingnobuckets"); + List<String> rs = queryTable(driver, "select * from default.streamingNoBuckets"); Assert.assertEquals(1, rs.size()); Assert.assertEquals("foo\tbar", rs.get(0)); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "StreamingNoBuckets", null); String[] colNames1 = new String[] { "a", "b" }; StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection); @@ -365,6 +367,11 @@ public class TestStreaming { txnBatch.beginNextTransaction(); txnBatch.write("a1,b2".getBytes()); txnBatch.write("a3,b4".getBytes()); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals(resp.getLocksSize(), 1); + Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename()); + Assert.assertEquals("default", resp.getLocks().get(0).getDbname()); txnBatch.commit(); txnBatch.beginNextTransaction(); txnBatch.write("a5,b6".getBytes()); http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 4d51bbc..fe6d2d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; @@ -43,8 +44,11 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,7 +129,9 @@ public class Cleaner extends CompactorThread { } if (toClean.size() > 0 || compactId2LockMap.size() > 0) { ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - + if(LOG.isDebugEnabled()) { + dumpLockState(locksResponse); + } for (CompactionInfo ci : toClean) { // Check to see if we have seen this request before. If so, ignore it. If not, // add it to our queue. @@ -163,6 +169,9 @@ public class Cleaner extends CompactorThread { for (Long lockId : expiredLocks) { queueEntry.getValue().remove(lockId); } + LOG.info("Skipping cleaning of " + + idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + + " due to reader present: " + queueEntry.getValue()); } } } finally { @@ -203,9 +212,18 @@ public class Cleaner extends CompactorThread { private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { Set<Long> relatedLocks = new HashSet<Long>(); for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - if (ci.dbname.equals(lock.getDbname())) { + /** + * Hive QL is not case sensitive wrt db/table/column names + * Partition names get + * normalized (as far as I can tell) by lower casing column name but not partition value. + * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} + * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} + * Since user input may start out in any case, compare here case-insensitive for db/table + * but leave partition name as is. + */ + if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) { + (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { if ((ci.partName == null && lock.getPartname() == null) || (ci.partName != null && ci.partName.equals(lock.getPartname()))) { relatedLocks.add(lock.getLockid()); @@ -226,12 +244,13 @@ public class Cleaner extends CompactorThread { } private void clean(CompactionInfo ci) throws MetaException { - LOG.info("Starting cleaning for " + ci.getFullPartitionName()); + LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); if (t == null) { // The table was dropped before we got around to cleaning it. - LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped"); + LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + + idWatermark(ci)); txnHandler.markCleaned(ci); return; } @@ -241,7 +260,7 @@ public class Cleaner extends CompactorThread { if (p == null) { // The partition was dropped before we got around to cleaning it. LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped"); + ", assuming it was dropped." + idWatermark(ci)); txnHandler.markCleaned(ci); return; } @@ -271,7 +290,7 @@ public class Cleaner extends CompactorThread { : new ValidReaderWriteIdList(); if (runJobAsSelf(ci.runAs)) { - removeFiles(location, validWriteIdList); + removeFiles(location, validWriteIdList, ci); } else { LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, @@ -279,7 +298,7 @@ public class Cleaner extends CompactorThread { ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - removeFiles(location, validWriteIdList); + removeFiles(location, validWriteIdList, ci); return null; } }); @@ -287,7 +306,7 @@ public class Cleaner extends CompactorThread { FileSystem.closeAllForUGI(ugi); } catch (IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName(), exception); + ci.getFullPartitionName() + idWatermark(ci), exception); } } txnHandler.markCleaned(ci); @@ -297,20 +316,35 @@ public class Cleaner extends CompactorThread { txnHandler.markFailed(ci); } } - - private void removeFiles(String location, ValidWriteIdList writeIdList) throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList); + private static String idWatermark(CompactionInfo ci) { + return " id=" + ci.id; + } + private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) + throws IOException { + Path locPath = new Path(location); + AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List<FileStatus> obsoleteDirs = dir.getObsolete(); List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size()); + StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); + extraDebugInfo.append(stat.getPath().getName()).append(","); + if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) { + LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath()); + } } + extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); + List<Long> compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); + Collections.sort(compactIds); + extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); + LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + ", that hardly seems right."); return; } - LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location); + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { @@ -319,4 +353,63 @@ public class Cleaner extends CompactorThread { fs.delete(dead, true); } } + private static class LockComparator implements Comparator<ShowLocksResponseElement> { + //sort ascending by resource, nulls first + @Override + public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { + if(o1 == o2) { + return 0; + } + if(o1 == null) { + return -1; + } + if(o2 == null) { + return 1; + } + int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); + if(v != 0) { + return v; + } + if(o1.getTablename() == null) { + return -1; + } + if(o2.getTablename() == null) { + return 1; + } + v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); + if(v != 0) { + return v; + } + if(o1.getPartname() == null) { + return -1; + } + if(o2.getPartname() == null) { + return 1; + } + v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); + if(v != 0) { + return v; + } + //if still equal, compare by lock ids + v = Long.compare(o1.getLockid(), o2.getLockid()); + if(v != 0) { + return v; + } + return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); + + } + } + private void dumpLockState(ShowLocksResponse slr) { + Iterator<ShowLocksResponseElement> l = slr.getLocksIterator(); + List<ShowLocksResponseElement> sortedList = new ArrayList<>(); + while(l.hasNext()) { + sortedList.add(l.next()); + } + //sort for readability + sortedList.sort(new LockComparator()); + LOG.info("dumping locks"); + for(ShowLocksResponseElement lock : sortedList) { + LOG.info(lock.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 22765b8..c95daaf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -248,7 +248,7 @@ public class Initiator extends CompactorThread { if (runJobAsSelf(runAs)) { return determineCompactionType(ci, writeIds, sd, tblproperties); } else { - LOG.info("Going to initiate as user " + runAs); + LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 39a0f31..db596a6 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1656,9 +1656,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if(!updateTxnComponents) { continue; } - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); Long writeId = null; if (tblName != null) { // It is assumed the caller have already allocated write id for adding/updating data to @@ -1666,8 +1666,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // may return empty result sets. // Get the write id allocated by this txn for the given table writes s = "select t2w_writeid from TXN_TO_WRITE_ID where" - + " t2w_database = " + quoteString(dbName.toLowerCase()) - + " and t2w_table = " + quoteString(tblName.toLowerCase()) + + " t2w_database = " + quoteString(dbName) + + " and t2w_table = " + quoteString(tblName) + " and t2w_txnid = " + txnid; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -1704,9 +1704,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { + lc + " agentInfo=" + rqst.getAgentInfo()); } intLockId++; - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); LockType lockType = lc.getType(); char lockChar = 'z'; switch (lockType) { @@ -1764,6 +1764,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return enqueueLockWithRetry(rqst); } } + private static String normalizeCase(String s) { + return s == null ? null : s.toLowerCase(); + } private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { try { @@ -2385,7 +2388,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { Long writeId = rqst.getWriteid(); List<String> rows = new ArrayList<>(); for (String partName : rqst.getPartitionnames()) { - rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + + rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname())) + + "," + quoteString(normalizeCase(rqst.getTablename())) + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId); } int modCount = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index e5dd3b3..3343d10 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -69,6 +69,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -356,7 +358,7 @@ public class TestStreaming { List<String> rs = queryTable(driver, "select * from default.streamingnobuckets"); Assert.assertEquals(1, rs.size()); Assert.assertEquals("foo\tbar", rs.get(0)); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "streamingNoBuckets", null); String[] colNames1 = new String[] { "a", "b" }; StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection); @@ -365,6 +367,11 @@ public class TestStreaming { txnBatch.beginNextTransaction(); txnBatch.write("a1,b2".getBytes()); txnBatch.write("a3,b4".getBytes()); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals(resp.getLocksSize(), 1); + Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename()); + Assert.assertEquals("default", resp.getLocks().get(0).getDbname()); txnBatch.commit(); txnBatch.beginNextTransaction(); txnBatch.write("a5,b6".getBytes());