reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180011787
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -802,88 +881,230 @@ private long readLastLogId(File f) {
         }
     }
 
+    interface EntryLogManager {
+        /*
+         * acquire lock for this ledger.
+         */
+        void acquireLock(Long ledgerId);
+
+        /*
+         * acquire lock for this ledger if it is not already available for this
+         * ledger then it will create a new one and then acquire lock.
+         */
+        void acquireLockByCreatingIfRequired(Long ledgerId);
+
+        /*
+         * release lock for this ledger
+         */
+        void releaseLock(Long ledgerId);
+
+        /*
+         * sets the logChannel for the given ledgerId. The previous one will be
+         * removed from replicaOfCurrentLogChannels. Previous logChannel will 
be
+         * added to rotatedLogChannels.
+         */
+        void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel);
+
+        /*
+         * gets the logChannel for the given ledgerId.
+         */
+        BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+        /*
+         * gets the copy of rotatedLogChannels
+         */
+        Set<BufferedLogChannel> getCopyOfRotatedLogChannels();
+
+        /*
+         * gets the copy of replicaOfCurrentLogChannels
+         */
+        Set<BufferedLogChannel> getCopyOfCurrentLogs();
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+        /*
+         * removes the logChannel from rotatedLogChannels collection
+         */
+        void removeFromRotatedLogChannels(BufferedLogChannel 
rotatedLogChannelToRemove);
+
+        /*
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+        /*
+         * Do the operations required for checkpoint.
+         */
+        void checkpoint() throws IOException;
+
+        /*
+         * roll entryLogs.
+         */
+        void rollLogs() throws IOException;
+
+        /*
+         * gets the log id of the current activeEntryLog. for
+         * EntryLogManagerForSingleEntryLog it returns logid of current
+         * activelog, for EntryLogManagerForEntryLogPerLedger it would return
+         * some constant value.
+         */
+        long getCurrentLogId();
+    }
+
+    class EntryLogManagerForSingleEntryLog implements EntryLogManager {
+
+        private volatile BufferedLogChannel activeLogChannel;
+        private Lock lockForActiveLogChannel;
+        private final Set<BufferedLogChannel> rotatedLogChannels;
+
+        EntryLogManagerForSingleEntryLog() {
+            rotatedLogChannels = ConcurrentHashMap.newKeySet();
+            lockForActiveLogChannel = new ReentrantLock();
+        }
+
+        /*
+         * since entryLogPerLedger is not enabled, it is just one lock for all
+         * ledgers.
+         */
+        @Override
+        public void acquireLock(Long ledgerId) {
+            lockForActiveLogChannel.lock();
+        }
+
+        @Override
+        public void acquireLockByCreatingIfRequired(Long ledgerId) {
+            acquireLock(ledgerId);
+        }
+
+        @Override
+        public void releaseLock(Long ledgerId) {
+            lockForActiveLogChannel.unlock();
+        }
+
+        @Override
+        public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel) {
+            acquireLock(ledgerId);
+            try {
+                BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+                activeLogChannel = logChannel;
+                if (hasToRotateLogChannel != null) {
+                    rotatedLogChannels.add(hasToRotateLogChannel);
+                }
+            } finally {
+                releaseLock(ledgerId);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+            return activeLogChannel;
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() {
+            return new HashSet<BufferedLogChannel>(rotatedLogChannels);
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfCurrentLogs() {
+            HashSet<BufferedLogChannel> copyOfCurrentLogs = new 
HashSet<BufferedLogChannel>();
+            copyOfCurrentLogs.add(activeLogChannel);
+            return copyOfCurrentLogs;
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+            if ((activeLogChannelTemp != null) && 
(activeLogChannelTemp.getLogId() == entryLogId)) {
+                return activeLogChannelTemp;
+            }
+            return null;
+        }
+
+        @Override
+        public void removeFromRotatedLogChannels(BufferedLogChannel 
rotatedLogChannelToRemove) {
+            rotatedLogChannels.remove(rotatedLogChannelToRemove);
+        }
+
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Collections.shuffle(writableLedgerDirs);
+            return writableLedgerDirs.get(0);
+        }
+
+        @Override
+        public void checkpoint() throws IOException {
+            flushRotatedLogs();
+        }
+
+        @Override
+        public void rollLogs() throws IOException {
+            createNewLog(INVALID_LEDGERID);
+        }
+
+        @Override
+        public long getCurrentLogId() {
+            return activeLogChannel.getLogId();
+        }
+    }
+
     /**
      * Flushes all rotated log channels. After log channels are flushed,
      * move leastUnflushedLogId ptr to current logId.
      */
     void checkpoint() throws IOException {
-        flushRotatedLogs();
-        /*
-         * In the case of entryLogPerLedgerEnabled we need to flush both
-         * rotatedlogs and currentlogs. This is needed because syncThread
-         * periodically does checkpoint and at this time all the logs should
-         * be flushed.
-         *
-         * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
-         * this Issue, I will move this logic to individual implamentations of
-         * EntryLogManager and it would be free of this booalen flag based 
logic.
-         *
-         */
-        if (entryLogPerLedgerEnabled) {
-            flushCurrentLog();
-        }
+        entryLogManager.checkpoint();
     }
 
     void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
-        }
+        Set<BufferedLogChannel> channels = 
entryLogManager.getCopyOfRotatedLogChannels();
         if (null == channels) {
             return;
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flushAndForceWrite(false);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
-                    }
-                }
-                throw ioe;
-            }
-            // remove the channel from the list after it is successfully 
flushed
-            chIter.remove();
+        for (BufferedLogChannel channel : channels) {
+            channel.flushAndForceWrite(true);
             // since this channel is only used for writing, after flushing the 
channel,
             // we had to close the underlying file channel. Otherwise, we 
might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
             closeFileChannel(channel);
-            if (channel.getLogId() > flushedLogId) {
-                flushedLogId = channel.getLogId();
-            }
+            
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            entryLogManager.removeFromRotatedLogChannels(channel);
             LOG.info("Synced entry logger {} to disk.", channel.getLogId());
         }
-        // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = flushedLogId + 1;
     }
 
     public void flush() throws IOException {
+        flushCurrentLogs();
 
 Review comment:
   i think it is incorrect to call flushrotatedlogs first and then 
flushcurrentlogs. Consider this scenario - if in between flushRotatedLogs, 
flushCurrentLogs calls current activelog is rotated and added to rotatedlogs 
list then we would miss flushing that log in this current flush call. So it has 
to be first flushCurrentLogs and then flushRotatedLogs

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to