Updated Branches: refs/heads/trunk e27ae5fdc -> 705abaf00
FLUME-2235. idleFuture should be cancelled at the start of append (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/705abaf0 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/705abaf0 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/705abaf0 Branch: refs/heads/trunk Commit: 705abaf00fbf8ee69ac88cbccae47c1a33f4b4b2 Parents: e27ae5f Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Nov 7 14:53:04 2013 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Nov 7 14:53:04 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/BucketWriter.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/705abaf0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 65f4d2c..200d457 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -375,6 +375,27 @@ class BucketWriter { public synchronized void append(final Event event) throws IOException, InterruptedException { checkAndThrowInterruptedException(); + // If idleFuture is not null, cancel it before we move forward to avoid a + // close call in the middle of the append. + if(idleFuture != null) { + idleFuture.cancel(false); + // There is still a small race condition - if the idleFuture is already + // running, interrupting it can cause HDFS close operation to throw - + // so we cannot interrupt it while running. If the future could not be + // cancelled, it is already running - wait for it to finish before + // attempting to write. + if(!idleFuture.isDone()) { + try { + idleFuture.get(callTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + LOG.warn("Timeout while trying to cancel closing of idle file. Idle" + + " file close may have failed", ex); + } catch (Exception ex) { + LOG.warn("Error while trying to cancel closing of idle file. ", ex); + } + } + idleFuture = null; + } if (!isOpen) { if(idleClosed) { throw new IOException("This bucket writer was closed due to idling and this handle " +
