reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180011787
########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ########## @@ -802,88 +881,230 @@ private long readLastLogId(File f) { } } + interface EntryLogManager { + /* + * acquire lock for this ledger. + */ + void acquireLock(Long ledgerId); + + /* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ + void acquireLockByCreatingIfRequired(Long ledgerId); + + /* + * release lock for this ledger + */ + void releaseLock(Long ledgerId); + + /* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ + void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); + + /* + * gets the logChannel for the given ledgerId. + */ + BufferedLogChannel getCurrentLogForLedger(Long ledgerId); + + /* + * gets the copy of rotatedLogChannels + */ + Set<BufferedLogChannel> getCopyOfRotatedLogChannels(); + + /* + * gets the copy of replicaOfCurrentLogChannels + */ + Set<BufferedLogChannel> getCopyOfCurrentLogs(); + + /* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ + BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + + /* + * removes the logChannel from rotatedLogChannels collection + */ + void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove); + + /* + * Returns eligible writable ledger dir for the creation next entrylog + */ + File getDirForNextEntryLog(List<File> writableLedgerDirs); + + /* + * Do the operations required for checkpoint. + */ + void checkpoint() throws IOException; + + /* + * roll entryLogs. + */ + void rollLogs() throws IOException; + + /* + * gets the log id of the current activeEntryLog. for + * EntryLogManagerForSingleEntryLog it returns logid of current + * activelog, for EntryLogManagerForEntryLogPerLedger it would return + * some constant value. + */ + long getCurrentLogId(); + } + + class EntryLogManagerForSingleEntryLog implements EntryLogManager { + + private volatile BufferedLogChannel activeLogChannel; + private Lock lockForActiveLogChannel; + private final Set<BufferedLogChannel> rotatedLogChannels; + + EntryLogManagerForSingleEntryLog() { + rotatedLogChannels = ConcurrentHashMap.newKeySet(); + lockForActiveLogChannel = new ReentrantLock(); + } + + /* + * since entryLogPerLedger is not enabled, it is just one lock for all + * ledgers. + */ + @Override + public void acquireLock(Long ledgerId) { + lockForActiveLogChannel.lock(); + } + + @Override + public void acquireLockByCreatingIfRequired(Long ledgerId) { + acquireLock(ledgerId); + } + + @Override + public void releaseLock(Long ledgerId) { + lockForActiveLogChannel.unlock(); + } + + @Override + public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel) { + acquireLock(ledgerId); + try { + BufferedLogChannel hasToRotateLogChannel = activeLogChannel; + activeLogChannel = logChannel; + if (hasToRotateLogChannel != null) { + rotatedLogChannels.add(hasToRotateLogChannel); + } + } finally { + releaseLock(ledgerId); + } + } + + @Override + public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) { + return activeLogChannel; + } + + @Override + public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() { + return new HashSet<BufferedLogChannel>(rotatedLogChannels); + } + + @Override + public Set<BufferedLogChannel> getCopyOfCurrentLogs() { + HashSet<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(); + copyOfCurrentLogs.add(activeLogChannel); + return copyOfCurrentLogs; + } + + @Override + public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) { + BufferedLogChannel activeLogChannelTemp = activeLogChannel; + if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) { + return activeLogChannelTemp; + } + return null; + } + + @Override + public void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove) { + rotatedLogChannels.remove(rotatedLogChannelToRemove); + } + + @Override + public File getDirForNextEntryLog(List<File> writableLedgerDirs) { + Collections.shuffle(writableLedgerDirs); + return writableLedgerDirs.get(0); + } + + @Override + public void checkpoint() throws IOException { + flushRotatedLogs(); + } + + @Override + public void rollLogs() throws IOException { + createNewLog(INVALID_LEDGERID); + } + + @Override + public long getCurrentLogId() { + return activeLogChannel.getLogId(); + } + } + /** * Flushes all rotated log channels. After log channels are flushed, * move leastUnflushedLogId ptr to current logId. */ void checkpoint() throws IOException { - flushRotatedLogs(); - /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. - * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. - * - */ - if (entryLogPerLedgerEnabled) { - flushCurrentLog(); - } + entryLogManager.checkpoint(); } void flushRotatedLogs() throws IOException { - List<BufferedLogChannel> channels = null; - long flushedLogId = INVALID_LID; - synchronized (this) { - channels = logChannelsToFlush; - logChannelsToFlush = null; - } + Set<BufferedLogChannel> channels = entryLogManager.getCopyOfRotatedLogChannels(); if (null == channels) { return; } - Iterator<BufferedLogChannel> chIter = channels.iterator(); - while (chIter.hasNext()) { - BufferedLogChannel channel = chIter.next(); - try { - channel.flushAndForceWrite(false); - } catch (IOException ioe) { - // rescue from flush exception, add unflushed channels back - synchronized (this) { - if (null == logChannelsToFlush) { - logChannelsToFlush = channels; - } else { - logChannelsToFlush.addAll(0, channels); - } - } - throw ioe; - } - // remove the channel from the list after it is successfully flushed - chIter.remove(); + for (BufferedLogChannel channel : channels) { + channel.flushAndForceWrite(true); // since this channel is only used for writing, after flushing the channel, // we had to close the underlying file channel. Otherwise, we might end up // leaking fds which cause the disk spaces could not be reclaimed. closeFileChannel(channel); - if (channel.getLogId() > flushedLogId) { - flushedLogId = channel.getLogId(); - } + recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId()); + entryLogManager.removeFromRotatedLogChannels(channel); LOG.info("Synced entry logger {} to disk.", channel.getLogId()); } - // move the leastUnflushedLogId ptr - leastUnflushedLogId = flushedLogId + 1; } public void flush() throws IOException { + flushCurrentLogs(); Review comment: i think it is incorrect to call flushrotatedlogs first and then flushcurrentlogs. Consider this scenario - if in between flushRotatedLogs, flushCurrentLogs calls current activelog is rotated and added to rotatedlogs list then we would miss flushing that log in this current flush call. So it has to be first flushCurrentLogs and then flushRotatedLogs ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services