Repository: flume Updated Branches: refs/heads/trunk 3fccd241d -> bd80c5e67
FLUME-2804. Hive sink - abort remaining transactions on shutdown (Sriharsha Chintalapani via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd80c5e6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd80c5e6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd80c5e6 Branch: refs/heads/trunk Commit: bd80c5e67deda4ec146f19fc92ff0c3bff61a982 Parents: 3fccd24 Author: Roshan Naik <[email protected]> Authored: Tue Sep 29 14:43:34 2015 -0700 Committer: Roshan Naik <[email protected]> Committed: Tue Sep 29 14:47:12 2015 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hive/HiveSink.java | 4 +- .../org/apache/flume/sink/hive/HiveWriter.java | 59 +++++++++++++++++++- 2 files changed, 59 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java index 6fe332a..d93bca3 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java @@ -320,6 +320,7 @@ public class HiveSink extends AbstractSink implements Configurable { sinkCounter.addToEventDrainSuccessCount(txnEventCount); return txnEventCount; } catch (HiveWriter.Failure e) { + // in case of error we close all TxnBatches to start clean next time LOG.warn(getName() + " : " + e.getMessage(), e); abortAllWriters(); closeAllWriters(); @@ -462,8 +463,7 @@ public class HiveSink extends AbstractSink implements Configurable { for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { try { HiveWriter w = entry.getValue(); - LOG.info("Closing connection to {}", w); - w.closeConnection(); + w.close(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java index 46309be..ec30c98 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java @@ -216,7 +216,7 @@ class HiveWriter { } /** - * Aborts the current Txn and switches to next Txn. + * Aborts the current Txn * @throws InterruptedException */ public void abort() throws InterruptedException { @@ -253,11 +253,66 @@ class HiveWriter { */ public void close() throws InterruptedException { batch.clear(); + abortRemainingTxns(); closeTxnBatch(); closeConnection(); closed = true; } + + private void abortRemainingTxns() throws InterruptedException { + try { + if ( !isClosed(txnBatch.getCurrentTransactionState()) ) { + abortCurrTxnHelper(); + } + + // recursively abort remaining txns + if(txnBatch.remainingTransactions()>0) { + timedCall( + new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.beginNextTransaction(); + return null; + } + }); + abortRemainingTxns(); + } + } catch (StreamingException e) { + LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e); + return; + } catch (TimeoutException e) { + LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e); + return; + } + } + + private void abortCurrTxnHelper() throws TimeoutException, InterruptedException { + try { + timedCall( + new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.abort(); + LOG.info("Aborted txn " + txnBatch.getCurrentTxnId()); + return null; + } + } + ); + } catch (StreamingException e) { + LOG.warn("Unable to abort transaction " + txnBatch.getCurrentTxnId(), e); + // continue to attempt to abort other txns in the batch + } + } + + private boolean isClosed(TransactionBatch.TxnState txnState) { + if(txnState == TransactionBatch.TxnState.COMMITTED) + return true; + if(txnState == TransactionBatch.TxnState.ABORTED) + return true; + return false; + } + public void closeConnection() throws InterruptedException { LOG.info("Closing connection to EndPoint : {}", endPoint); try { @@ -346,7 +401,7 @@ class HiveWriter { private void closeTxnBatch() throws InterruptedException { try { - LOG.debug("Closing Txn Batch {}", txnBatch); + LOG.info("Closing Txn Batch {}.", txnBatch); timedCall(new CallRunner1<Void>() { @Override public Void call() throws InterruptedException, StreamingException {
