Charan, After reading the code more carefully, I think your observation is correct. That is an issue when exceptions are thrown on flushing.
Can you file a JIRA for this? I think the fix should be straightforward by catching this exception and adding the unflushed logs back to the list. On Fri, Apr 21, 2017 at 5:22 PM, Sijie Guo <[email protected]> wrote: > Charan, will check later today > > On Apr 20, 2017 10:55 PM, "Charan Reddy G" <[email protected]> > wrote: > >> Hey, >> >> SyncThread.checkpoint(Checkpoint checkpoint) (which is called >> periodically >> by SyncThread's executor for every flushInterval) ultimately calls >> EntryLogger.flushRotatedLogs. >> >> In EntryLogger.flushRotatedLogs, first we set 'logChannelsToFlush' to null >> and then we try to flush and close individual file. Now, if IOException >> happens while trying to flush/close the logchannel, then exception is >> thrown as it is and it get propagates back upto SyncThread.checkpoint. >> Here >> we catch that IOException, log it and return without calling the >> checkpointComplete. But by now we lost reference of 'logChannelsToFlush' >> (rolled logs which are yet to be closed), because it is set to null before >> we try to flush/close individually rolledlogs. The next execution of >> 'checkpoint' (after flushinterval) wouldn't be knowing about the >> rolledlogs >> it failed to flush/close the previous time and it would flush the newly >> rolledlogs. So the failure of flush/close of the previous rolledlogs goes >> unnoticed completely. >> >> in EntryLogger.java >> void flushRotatedLogs() throws IOException { >> List<BufferedLogChannel> channels = null; >> long flushedLogId = INVALID_LID; >> synchronized (this) { >> channels = logChannelsToFlush; >> logChannelsToFlush = null; <--------- here we >> set >> 'logChannelsToFlush' to null before it tries to flush/close rolledlogs >> } >> if (null == channels) { >> return; >> } >> for (BufferedLogChannel channel : channels) { >> channel.flush(true); >> <------------IOEXception can happen here or in the following >> closeFileChannel call >> // since this channel is only used for writing, after flushing >> the channel, >> // we had to close the underlying file channel. Otherwise, we >> might end up >> // leaking fds which cause the disk spaces could not be >> reclaimed. >> closeFileChannel(channel); >> if (channel.getLogId() > flushedLogId) { >> flushedLogId = channel.getLogId(); >> } >> LOG.info("Synced entry logger {} to disk.", >> channel.getLogId()); >> } >> // move the leastUnflushedLogId ptr >> leastUnflushedLogId = flushedLogId + 1; >> } >> >> in SyncThread.java >> public void checkpoint(Checkpoint checkpoint) { >> try { >> checkpoint = ledgerStorage.checkpoint(checkpoint); >> } catch (NoWritableLedgerDirException e) { >> LOG.error("No writeable ledger directories", e); >> dirsListener.allDisksFull(); >> return; >> } catch (IOException e) { >> LOG.error("Exception flushing ledgers", e); <-----that IOExc >> gets propagated to this method and here it is caught and not dealt >> appropriately >> return; >> } >> >> try { >> checkpointSource.checkpointComplete(checkpoint, true); >> } catch (IOException e) { >> LOG.error("Exception marking checkpoint as complete", e); >> dirsListener.allDisksFull(); >> } >> } >> >> Thanks, >> Charan >> >
