Updated Branches:
  refs/heads/flume-1.4 36c4d2fd4 -> 03e21c1c2

FLUME-1864. Allow hdfs idle callback to clean up closed bucket writers.

(Juhani Connolly via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/03e21c1c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/03e21c1c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/03e21c1c

Branch: refs/heads/flume-1.4
Commit: 03e21c1c27e9f2f94d4ef8dfc786aaccdd42501c
Parents: 36c4d2f
Author: Hari Shreedharan <[email protected]>
Authored: Fri Feb 15 15:15:32 2013 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Feb 15 15:17:13 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   13 +++++--------
 1 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/03e21c1c/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 1ff1984..0786857 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -309,11 +309,6 @@ class BucketWriter {
       timedRollFuture = null;
     }
 
-    if(idleFuture != null && !idleFuture.isDone()) {
-      idleFuture.cancel(false);
-      idleFuture = null;
-    }
-
     if (bucketPath != null && fileSystem != null) {
       renameBucket(); // could block or throw IOException
       fileSystem = null;
@@ -342,9 +337,11 @@ class BucketWriter {
           Callable<Void> idleAction = new Callable<Void>() {
             public Void call() throws Exception {
               try {
-                LOG.info("Closing idle bucketWriter {}", bucketPath);
-                idleClosed = true;
-                close();
+                if(isOpen) {
+                  LOG.info("Closing idle bucketWriter {}", bucketPath);
+                  idleClosed = true;
+                  close();
+                }
                 if(onIdleCallback != null)
                   onIdleCallback.run(onIdleCallbackPath);
               } catch(Throwable t) {

Reply via email to