Repository: flume Updated Branches: refs/heads/trunk fff13b5e0 -> 318da2088
FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions (Deepesh Khandelwal via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/318da208 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/318da208 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/318da208 Branch: refs/heads/trunk Commit: 318da208844d02ed7554724ae526cefe94dd894c Parents: fff13b5 Author: Roshan Naik <[email protected]> Authored: Tue Aug 25 18:38:14 2015 -0700 Committer: Roshan Naik <[email protected]> Committed: Tue Aug 25 18:39:23 2015 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hive/HiveWriter.java | 10 ++++-- .../apache/flume/sink/hive/TestHiveWriter.java | 32 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java index aa8576e..46309be 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java @@ -88,6 +88,7 @@ class HiveWriter { this.serializer = serializer; this.recordWriter = serializer.createRecordWriter(endPoint); this.txnBatch = nextTxnBatch(recordWriter); + this.txnBatch.beginNextTransaction(); this.closed = false; this.lastUsed = System.currentTimeMillis(); } catch (InterruptedException e) { @@ -117,6 +118,10 @@ class HiveWriter { hearbeatNeeded = true; } + public int getRemainingTxns() { + return txnBatch.remainingTransactions(); + } + /** * Write data, update stats @@ -212,7 +217,7 @@ class HiveWriter { /** * Aborts the current Txn and switches to next Txn. - * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn + * @throws InterruptedException */ public void abort() throws InterruptedException { batch.clear(); @@ -332,8 +337,7 @@ class HiveWriter { return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block } }); - LOG.info("Acquired Txn Batch {}. Switching to first txn", batch); - batch.beginNextTransaction(); + LOG.info("Acquired Transaction batch {}", batch); } catch (Exception e) { throw new TxnBatchException(endPoint, e); } http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java index 174f179..41bf0f6 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java @@ -174,6 +174,38 @@ public class TestHiveWriter { checkRecordCountInTable(3); } + @Test + public void testTxnBatchConsumption() throws Exception { + // get a small txn batch and consume it, then roll to new batch, very + // the number of remaining txns to ensure Txns are not accidentally skipped + + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + + int txnPerBatch = 3; + + HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter); + + Assert.assertEquals(writer.getRemainingTxns(),2); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 1); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 0); + writer.flush(true); + + // flip over to next batch + Assert.assertEquals(writer.getRemainingTxns(), 2); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 1); + + writer.close(); + + } + private void checkRecordCountInTable(int expectedCount) throws CommandNeedRetryException, IOException { int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();
