FLUME-1512: File Channel should not stop during a checkpoint (Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/74e1d700 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/74e1d700 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/74e1d700 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 74e1d700f17fb021a779bbc7a3a42c8bd8ae5b4a Parents: be6458f Author: Brock Noland <[email protected]> Authored: Fri Aug 24 09:35:00 2012 -0500 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:06 2012 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 56 ++++++++------ 1 files changed, 32 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/74e1d700/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 1e2706b..5b39b57 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -488,41 +488,49 @@ class Log { checkpointReadLock.unlock(); } + private void lockExclusive(){ + checkpointWriterLock.lock(); + } /** * Synchronization required since we do not want this * to be called during a checkpoint. */ synchronized void close() { - open = false; - if (worker != null) { - worker.shutdown(); - worker.interrupt(); - } - if (logFiles != null) { - for (int index = 0; index < logFiles.length(); index++) { - logFiles.get(index).close(); + lockExclusive(); + try { + open = false; + if (worker != null) { + worker.shutdown(); + worker.interrupt(); } - } - synchronized (idLogFileMap) { - for(Integer logId : idLogFileMap.keySet()) { - LogFile.RandomReader reader = idLogFileMap.get(logId); - if(reader != null) { - reader.close(); + if (logFiles != null) { + for (int index = 0; index < logFiles.length(); index++) { + logFiles.get(index).close(); + } + } + synchronized (idLogFileMap) { + for (Integer logId : idLogFileMap.keySet()) { + LogFile.RandomReader reader = idLogFileMap.get(logId); + if (reader != null) { + reader.close(); + } } } - } - try { - unlock(checkpointDir); - } catch(IOException ex) { - LOGGER.warn("Error unlocking " + checkpointDir, ex); - } - for (File logDir : logDirs) { try { - unlock(logDir); - } catch(IOException ex) { - LOGGER.warn("Error unlocking " + logDir, ex); + unlock(checkpointDir); + } catch (IOException ex) { + LOGGER.warn("Error unlocking " + checkpointDir, ex); } + for (File logDir : logDirs) { + try { + unlock(logDir); + } catch (IOException ex) { + LOGGER.warn("Error unlocking " + logDir, ex); + } + } + } finally { + unlockExclusive(); } }
