Charan, will check later today
On Apr 20, 2017 10:55 PM, "Charan Reddy G" <[email protected]> wrote:
> Hey,
>
> SyncThread.checkpoint(Checkpoint checkpoint) (which is called periodically
> by SyncThread's executor for every flushInterval) ultimately calls
> EntryLogger.flushRotatedLogs.
>
> In EntryLogger.flushRotatedLogs, first we set 'logChannelsToFlush' to null
> and then we try to flush and close individual file. Now, if IOException
> happens while trying to flush/close the logchannel, then exception is
> thrown as it is and it get propagates back upto SyncThread.checkpoint. Here
> we catch that IOException, log it and return without calling the
> checkpointComplete. But by now we lost reference of 'logChannelsToFlush'
> (rolled logs which are yet to be closed), because it is set to null before
> we try to flush/close individually rolledlogs. The next execution of
> 'checkpoint' (after flushinterval) wouldn't be knowing about the rolledlogs
> it failed to flush/close the previous time and it would flush the newly
> rolledlogs. So the failure of flush/close of the previous rolledlogs goes
> unnoticed completely.
>
> in EntryLogger.java
> void flushRotatedLogs() throws IOException {
> List<BufferedLogChannel> channels = null;
> long flushedLogId = INVALID_LID;
> synchronized (this) {
> channels = logChannelsToFlush;
> logChannelsToFlush = null; <--------- here we set
> 'logChannelsToFlush' to null before it tries to flush/close rolledlogs
> }
> if (null == channels) {
> return;
> }
> for (BufferedLogChannel channel : channels) {
> channel.flush(true);
> <------------IOEXception can happen here or in the following
> closeFileChannel call
> // since this channel is only used for writing, after flushing
> the channel,
> // we had to close the underlying file channel. Otherwise, we
> might end up
> // leaking fds which cause the disk spaces could not be
> reclaimed.
> closeFileChannel(channel);
> if (channel.getLogId() > flushedLogId) {
> flushedLogId = channel.getLogId();
> }
> LOG.info("Synced entry logger {} to disk.",
> channel.getLogId());
> }
> // move the leastUnflushedLogId ptr
> leastUnflushedLogId = flushedLogId + 1;
> }
>
> in SyncThread.java
> public void checkpoint(Checkpoint checkpoint) {
> try {
> checkpoint = ledgerStorage.checkpoint(checkpoint);
> } catch (NoWritableLedgerDirException e) {
> LOG.error("No writeable ledger directories", e);
> dirsListener.allDisksFull();
> return;
> } catch (IOException e) {
> LOG.error("Exception flushing ledgers", e); <-----that IOExc
> gets propagated to this method and here it is caught and not dealt
> appropriately
> return;
> }
>
> try {
> checkpointSource.checkpointComplete(checkpoint, true);
> } catch (IOException e) {
> LOG.error("Exception marking checkpoint as complete", e);
> dirsListener.allDisksFull();
> }
> }
>
> Thanks,
> Charan
>