Updated Branches: refs/heads/trunk ce48a126e -> 9204456ee
FLUME-1930: Inflights should clean up executors on close (Hari Shreedharan via Juhani Connolly) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9204456e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9204456e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9204456e Branch: refs/heads/trunk Commit: 9204456eee8522201649b31a949a5a77710c1b2e Parents: ce48a12 Author: Juhani Connolly <[email protected]> Authored: Thu Mar 14 15:57:54 2013 +0900 Committer: Juhani Connolly <[email protected]> Committed: Thu Mar 14 15:57:54 2013 +0900 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 8 ++++- .../apache/flume/channel/file/FlumeEventQueue.java | 28 ++++++-------- .../java/org/apache/flume/channel/file/Log.java | 2 +- .../org/apache/flume/channel/file/TestLog.java | 2 +- 4 files changed, 21 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9204456e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index d98209b..ff42d19 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -27,6 +27,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -380,7 +381,12 @@ public class FileChannel extends BasicChannelSemantics { void close() { if(open) { open = false; - log.close(); + try { + log.close(); + } catch (Exception e) { + LOG.error("Error while trying to close the log.", e); + Throwables.propagate(e); + } log = null; queueRemaining = null; } http://git-wip-us.apache.org/repos/asf/flume/blob/9204456e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 72d9425..1ed9547 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -30,10 +30,12 @@ import java.util.Collection; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,9 +324,11 @@ final class FlumeEventQueue { return backingStore.getCapacity(); } - synchronized void close() { + synchronized void close() throws IOException { try { backingStore.close(); + inflightPuts.close(); + inflightTakes.close(); } catch (IOException e) { LOG.warn("Error closing backing store", e); } @@ -442,21 +446,9 @@ final class FlumeEventQueue { } byte[] checksum = digest.digest(buffer.array()); file.write(checksum); - future = Executors.newSingleThreadExecutor().submit( - new Runnable() { - @Override - public void run() { - try { - buffer.position(0); - fileChannel.write(buffer); - fileChannel.force(true); - } catch (IOException ex) { - LOG.error("Error while writing inflight events to " - + "inflights file: " - + inflightEventsFile.getName()); - } - } - }); + buffer.position(0); + fileChannel.write(buffer); + fileChannel.force(true); syncRequired = false; } catch (IOException ex) { LOG.error("Error while writing checkpoint to disk.", ex); @@ -527,5 +519,9 @@ final class FlumeEventQueue { public Collection<Long> getInFlightPointers() { return inflightEvents.values(); } + + public void close() throws IOException { + file.close(); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/9204456e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 7da8c49..6ffc824 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -677,7 +677,7 @@ class Log { * Synchronization not required since this method gets the write lock, * so checkpoint and this method cannot run at the same time. */ - void close() { + void close() throws IOException{ lockExclusive(); try { open = false; http://git-wip-us.apache.org/repos/asf/flume/blob/9204456e/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index 6751714..54978f8 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -60,7 +60,7 @@ public class TestLog { log.replay(); } @After - public void cleanup() { + public void cleanup() throws Exception{ if(log != null) { log.close(); }
