Updated Branches: refs/heads/flume-1.5 b17626f72 -> ad612c28b
FLUME-2235. idleFuture should be cancelled at the start of append (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ad612c28 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ad612c28 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ad612c28 Branch: refs/heads/flume-1.5 Commit: ad612c28bec3845775e12d6bc51ef724e6f78f06 Parents: b17626f Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Nov 7 14:53:04 2013 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Nov 7 14:54:09 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/BucketWriter.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ad612c28/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 65f4d2c..200d457 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -375,6 +375,27 @@ class BucketWriter { public synchronized void append(final Event event) throws IOException, InterruptedException { checkAndThrowInterruptedException(); + // If idleFuture is not null, cancel it before we move forward to avoid a + // close call in the middle of the append. + if(idleFuture != null) { + idleFuture.cancel(false); + // There is still a small race condition - if the idleFuture is already + // running, interrupting it can cause HDFS close operation to throw - + // so we cannot interrupt it while running. If the future could not be + // cancelled, it is already running - wait for it to finish before + // attempting to write. + if(!idleFuture.isDone()) { + try { + idleFuture.get(callTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + LOG.warn("Timeout while trying to cancel closing of idle file. Idle" + + " file close may have failed", ex); + } catch (Exception ex) { + LOG.warn("Error while trying to cancel closing of idle file. ", ex); + } + } + idleFuture = null; + } if (!isOpen) { if(idleClosed) { throw new IOException("This bucket writer was closed due to idling and this handle " +
