Repository: hive Updated Branches: refs/heads/branch-2.1 4af1be71e -> 77f563f26
HIVE-11956 SHOW LOCKS should indicate what acquired the lock(Eugene Koifman, reviewed by Wei Zheng) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a1fe6829 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a1fe6829 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a1fe6829 Branch: refs/heads/branch-2.1 Commit: a1fe68295bc475a5ace5153a005196142cb8a8a8 Parents: 4af1be7 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue May 31 20:26:18 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue May 31 20:26:18 2016 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/HiveEndPoint.java | 79 ++++++++++++++------ .../streaming/StreamingIntegrationTester.java | 2 +- .../hive/hcatalog/streaming/TestStreaming.java | 74 ++++++++++-------- .../hive/ql/txn/compactor/TestCompactor.java | 11 +-- .../hadoop/hive/metastore/txn/TxnHandler.java | 37 ++++++--- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 4 + .../hadoop/hive/ql/plan/ShowLocksDesc.java | 4 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 12 +++ .../clientpositive/dbtxnmgr_showlocks.q.out | 16 ++-- 9 files changed, 160 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index cb64fff..017f565 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -97,21 +97,48 @@ public class HiveEndPoint { /** + * @deprecated Use {@link #newConnection(boolean, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, + final UserGroupInformation authenticatedUser) + throws ConnectionError, InvalidPartition, + InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, authenticatedUser, null); + } + /** * Acquire a new connection to MetaStore for streaming * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'proxyUser' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists) - throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed - , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, null, null); + public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, agentInfo); } /** @@ -119,18 +146,20 @@ public class HiveEndPoint { * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist * @param conf HiveConf object, set it to null if not using advanced hive settings. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'proxyUser' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, conf, null); + return newConnection(createPartIfNotExists, conf, null, agentInfo); } /** @@ -144,21 +173,23 @@ public class HiveEndPoint { * @param conf HiveConf object to be used for the connection. Can be null. * @param authenticatedUser UserGroupInformation object obtained from successful authentication. * Uses non-secure mode if this argument is null. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if there is a connection problem * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'username' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, - final UserGroupInformation authenticatedUser) + final UserGroupInformation authenticatedUser, final String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { if( authenticatedUser==null ) { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } try { @@ -168,7 +199,7 @@ public class HiveEndPoint { public StreamingConnection run() throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } } ); @@ -178,10 +209,10 @@ public class HiveEndPoint { } private StreamingConnection newConnectionImpl(UserGroupInformation ugi, - boolean createPartIfNotExists, HiveConf conf) + boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return new ConnectionImpl(this, ugi, conf, createPartIfNotExists); + return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo); } private static UserGroupInformation getUserGroupInfo(String user) @@ -250,6 +281,7 @@ public class HiveEndPoint { private final UserGroupInformation ugi; private final String username; private final boolean secureMode; + private final String agentInfo; /** * @param endPoint end point to connect to @@ -262,11 +294,12 @@ public class HiveEndPoint { * @throws PartitionCreationFailed if createPart=true and not able to create partition */ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, - HiveConf conf, boolean createPart) + HiveConf conf, boolean createPart, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { this.endPt = endPoint; this.ugi = ugi; + this.agentInfo = agentInfo; this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName(); if (conf==null) { conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); @@ -397,7 +430,7 @@ public class HiveEndPoint { RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter); + , recordWriter, agentInfo); } @@ -530,6 +563,7 @@ public class HiveEndPoint { * file backing this batch any more. This guards important public methods */ private volatile boolean isClosed = false; + private final String agentInfo; /** * Represents a batch of transactions acquired from MetaStore @@ -537,8 +571,9 @@ public class HiveEndPoint { * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ - private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt - , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) + private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, + final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, + String agentInfo) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { @@ -554,6 +589,7 @@ public class HiveEndPoint { this.endPt = endPt; this.msClient = msClient; this.recordWriter = recordWriter; + this.agentInfo = agentInfo; txnIds = openTxnImpl(msClient, user, numTxns, ugi); @@ -628,7 +664,7 @@ public class HiveEndPoint { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; - lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); + lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { @@ -957,8 +993,9 @@ public class HiveEndPoint { } private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, - String partNameForLock, String user, long txnId) { - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder rqstBuilder = agentInfo == null ? + new LockRequestBuilder() : new LockRequestBuilder(agentInfo); rqstBuilder.setUser(user); rqstBuilder.setTransactionId(txnId); http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java index bf2cc63..3acfa35 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java @@ -276,7 +276,7 @@ public class StreamingIntegrationTester { public void run() { StreamingConnection conn = null; try { - conn = endPoint.newConnection(true); + conn = endPoint.newConnection(true, "UT_" + Thread.currentThread().getName()); RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint); for (int i = 0; i < batches; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 4d2a2ee..84e559d 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -302,7 +304,7 @@ public class TestStreaming { "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); - StreamingConnection connection = endPt.newConnection(false, null);//should this really be null? + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -376,7 +378,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null); String[] colNames1 = new String[] { "key1", "key2", "data" }; DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); txnBatch.beginNextTransaction(); @@ -427,14 +429,14 @@ public class TestStreaming { try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception } try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception @@ -498,17 +500,17 @@ public class TestStreaming { public void testEndpointConnection() throws Exception { // For partitioned table, partitionVals are specified HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw connection.close(); // For unpartitioned table, partitionVals are not specified endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - endPt.newConnection(false, null).close(); // should not throw + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw // For partitioned table, partitionVals are not specified try { endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -520,7 +522,7 @@ public class TestStreaming { // For unpartitioned table, partition values are specified try { endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals); - connection = endPt.newConnection(false); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -548,7 +550,7 @@ public class TestStreaming { } // Create partition - Assert.assertNotNull(endPt.newConnection(true, null)); + Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName())); // Ensure partition is present Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); @@ -561,7 +563,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -575,7 +577,7 @@ public class TestStreaming { // 2) To unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames2,",", endPt); - connection = endPt.newConnection(false, null); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -594,7 +596,7 @@ public class TestStreaming { public void testTimeOutReaper() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -640,7 +642,7 @@ public class TestStreaming { public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -669,7 +671,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -682,7 +684,7 @@ public class TestStreaming { // 2) to unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -698,7 +700,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -738,7 +740,7 @@ public class TestStreaming { // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn txnBatch = connection.fetchTransactionBatch(10, writer); @@ -758,7 +760,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); StrictJsonWriter writer = new StrictJsonWriter(endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -786,7 +788,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1) test with txn.Commit() TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -843,7 +845,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -867,16 +869,21 @@ public class TestStreaming { @Test public void testTransactionBatchAbortAndCommit() throws Exception { - + String agentInfo = "UT_" + Thread.currentThread().getName(); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.write("2,Welcome to streaming".getBytes()); + ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest()); + Assert.assertEquals("LockCount", 1, resp.getLocksSize()); + Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType()); + Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState()); + Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo()); txnBatch.abort(); checkNothingWritten(partLoc); @@ -901,7 +908,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -950,7 +957,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // Acquire 1st Txn Batch TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer); @@ -1017,7 +1024,7 @@ public class TestStreaming { WriterThd(HiveEndPoint ep, String data) throws Exception { super("Writer_" + data); writer = new DelimitedInputWriter(fieldNames, ",", ep); - conn = ep.newConnection(false); + conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName()); this.data = data; setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override @@ -1141,6 +1148,7 @@ public class TestStreaming { @Test public void testBucketing() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1166,7 +1174,7 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1179,7 +1187,7 @@ public class TestStreaming { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1217,6 +1225,7 @@ public class TestStreaming { @Test public void testFileDump() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1242,7 +1251,7 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1270,7 +1279,7 @@ public class TestStreaming { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1314,7 +1323,7 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // we need side file for this test, so we create 2 txn batch and test with only one TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); @@ -1440,7 +1449,7 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1655,6 +1664,7 @@ public class TestStreaming { @Test public void testErrorHandling() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1662,7 +1672,7 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); FaultyWriter writer = new FaultyWriter(innerWriter); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index e6ccdbc..ca2a912 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -430,7 +430,7 @@ public class TestCompactor { * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal; * thus the session * created in this class is gone after this; I fixed it in HiveEndPoint*/ - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -629,7 +629,7 @@ public class TestCompactor { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -691,7 +691,7 @@ public class TestCompactor { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -731,6 +731,7 @@ public class TestCompactor { @Test public void minorCompactAfterAbort() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); @@ -743,7 +744,7 @@ public class TestCompactor { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -808,7 +809,7 @@ public class TestCompactor { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 9c1b399..57517dc 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -960,16 +960,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - +intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + "(hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, " + + "hl_table, " + + "hl_partition, " + + "hl_lock_state, hl_lock_type, " + + "hl_last_heartbeat, " + + "hl_user, " + + "hl_host, " + + "hl_agent_info) values(" + + extLockId + ", " + intLockId + "," + txnid + ", " + + quoteString(dbName) + ", " + + valueOrNullLiteral(tblName) + ", " + + valueOrNullLiteral(partName) + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; + (isValidTxn(txnid) ? 0 : now) + ", " + + valueOrNullLiteral(rqst.getUser()) + ", " + + valueOrNullLiteral(rqst.getHostname()) + ", " + + valueOrNullLiteral(rqst.getAgentInfo()) + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1175,7 +1184,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + - "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; + "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS"; // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. String dbName = rqst.getDbname(); @@ -1240,6 +1249,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if(!rs.wasNull()) { e.setBlockedByIntId(id); } + e.setAgentInfo(rs.getString(15)); sortedList.add(new LockInfoExt(e)); } LOG.debug("Going to rollback"); @@ -3186,6 +3196,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throw new MetaException(msg); } } + /** + * Useful for building SQL strings + * @param value may be {@code null} + */ + private static String valueOrNullLiteral(String value) { + return value == null ? "null" : quoteString(value); + } static String quoteString(String input) { return "'" + input + "'"; } http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 00bff6b..3b634e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2577,6 +2577,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { os.writeBytes("User"); os.write(separator); os.writeBytes("Hostname"); + os.write(separator); + os.writeBytes("Agent Info"); os.write(terminator); List<ShowLocksResponseElement> locks = rsp.getLocks(); @@ -2616,6 +2618,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { os.write(separator); os.writeBytes(lock.getHostname()); os.write(separator); + os.writeBytes(lock.getAgentInfo() == null ? "NULL" : lock.getAgentInfo()); + os.write(separator); os.write(terminator); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java index 45a86f6..2a855f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java @@ -53,8 +53,8 @@ public class ShowLocksDesc extends DDLDesc implements Serializable { * Schema for use with db txn manager. */ private static final String newFormatSchema = "lockid,database,table,partition,lock_state," + - "blocked_by,lock_type,transaction_id,last_heartbeat,acquired_at,user,hostname#" + - "string:string:string:string:string:string:string:string:string:string:string:string"; + "blocked_by,lock_type,transaction_id,last_heartbeat,acquired_at,user,hostname,agent_info#" + + "string:string:string:string:string:string:string:string:string:string:string:string:string"; public String getDatabase() { return dbName; http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 4782213..19cde2f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1376,4 +1376,16 @@ public class TestDbTxnManager2 { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(rqst); return rsp.getLocks(); } + + @Test + public void testShowLocksAgentInfo() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists XYZ (a int, b int)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "XYZ"); + List<ShowLocksResponseElement> locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks.get(0)); + Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a1fe6829/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out index c1adeb3..ef07a2a 100644 --- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out +++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out @@ -2,17 +2,17 @@ PREHOOK: query: show locks PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks default PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks default POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show transactions PREHOOK: type: SHOW TRANSACTIONS POSTHOOK: query: show transactions @@ -30,27 +30,27 @@ PREHOOK: query: show locks database default PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks database default POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table partition (p='abc') PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table partition (p='abc') extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: drop table partitioned_acid_table PREHOOK: type: DROPTABLE PREHOOK: Input: default@partitioned_acid_table