Repository: hive Updated Branches: refs/heads/master fb07a1157 -> 1d2315829
HIVE-14813 Make TransactionBatchImpl.toString() include state of each txn: commit/abort (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d231582 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d231582 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d231582 Branch: refs/heads/master Commit: 1d2315829c84f5f28e6ea0a723da6471899030a3 Parents: fb07a11 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Wed Sep 27 15:28:34 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Wed Sep 27 15:28:34 2017 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/HiveEndPoint.java | 26 ++++++++++++++++++-- .../hcatalog/streaming/TransactionBatch.java | 12 ++++++++- .../hive/hcatalog/streaming/TestStreaming.java | 13 +++++++++- 3 files changed, 47 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d231582/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 28c98bd..db3109e 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -19,9 +19,9 @@ package org.apache.hive.hcatalog.streaming; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -577,6 +577,14 @@ public class HiveEndPoint { */ private volatile boolean isClosed = false; private final String agentInfo; + /** + * Tracks the state of each transaction + */ + private final TxnState[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()} + */ + private long lastTxnUsed; /** * Represents a batch of transactions acquired from MetaStore @@ -606,6 +614,10 @@ public class HiveEndPoint { this.agentInfo = agentInfo; txnIds = openTxnImpl(msClient, user, numTxns, ugi); + txnStatus = new TxnState[numTxns]; + for(int i = 0; i < txnStatus.length; i++) { + txnStatus[i] = TxnState.OPEN;//Open matches Metastore state + } this.state = TxnState.INACTIVE; recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); @@ -639,8 +651,14 @@ public class HiveEndPoint { if (txnIds==null || txnIds.isEmpty()) { return "{}"; } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for(TxnState state : txnStatus) { + //'state' should not be null - future proofing + sb.append(state == null ? "N" : state); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1) - + "] on endPoint = " + endPt; + + "] on endPoint = " + endPt + "; " + sb; } /** @@ -678,6 +696,7 @@ public class HiveEndPoint { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; + lastTxnUsed = getCurrentTxnId(); lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); @@ -862,6 +881,7 @@ public class HiveEndPoint { recordWriter.flush(); msClient.commitTxn(txnIds.get(currentTxnIndex)); state = TxnState.COMMITTED; + txnStatus[currentTxnIndex] = TxnState.COMMITTED; } catch (NoSuchTxnException e) { throw new TransactionError("Invalid transaction id : " + getCurrentTxnId(), e); @@ -924,12 +944,14 @@ public class HiveEndPoint { for(currentTxnIndex = minOpenTxnIndex; currentTxnIndex < txnIds.size(); currentTxnIndex++) { msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + txnStatus[currentTxnIndex] = TxnState.ABORTED; } currentTxnIndex--;//since the loop left it == txnId.size() } else { if (getCurrentTxnId() > 0) { msClient.rollbackTxn(getCurrentTxnId()); + txnStatus[currentTxnIndex] = TxnState.ABORTED; } } state = TxnState.ABORTED; http://git-wip-us.apache.org/repos/asf/hive/blob/1d231582/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java index 3bcc510..372ad2d 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java @@ -33,7 +33,17 @@ import java.util.Collection; * */ public interface TransactionBatch { - public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED } + enum TxnState { + INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"); + + private final String code; + TxnState(String code) { + this.code = code; + }; + public String toString() { + return code; + } + } /** * Activate the next available transaction in the current transaction batch http://git-wip-us.apache.org/repos/asf/hive/blob/1d231582/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 1e73a4b..921bbd3 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; @@ -1974,7 +1975,12 @@ public class TestStreaming { txnBatch.write("name4,2,more Streaming unlimited".getBytes()); txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); txnBatch.commit(); - + + //test toString() + String s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]")); + expectedEx = null; txnBatch.beginNextTransaction(); writer.enableErrors(); @@ -1998,6 +2004,11 @@ public class TestStreaming { Assert.assertTrue("commit() should have failed", expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + //test toString() + s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]")); + r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns();