Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/867

Change subject: ASTERIXDB-1i425 & ASTERIXDB-1450: Fix LogReader random reads
......................................................................

ASTERIXDB-1i425 & ASTERIXDB-1450: Fix LogReader random reads

- Fix random reads for truncated logs (ASTERIXDB-1425).
- Fix log file partition size boundary check (ASTERIXDB-1450).
- Fix deadlock between LogReader and LogFlusher.
- Prevent checkpoints from deleting log files being accessed by rollbacks.
- Make rollbacks start from LSN = max(txnFirstLSN, minMemoryLSN).
- Make default log partition size 250MB instead of 2GB.

Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
6 files changed, 256 insertions(+), 57 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/67/867/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index c4a5e8e..6c3b0ea 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -32,7 +32,7 @@
     private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = (128 << 10); // 
128KB
 
     private static final String TXN_LOG_PARTITIONSIZE_KEY = 
"txn.log.partitionsize";
-    private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = ((long) 2 << 
30); // 2GB
+    private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = 
StorageUtil.getSizeInBytes(250L, StorageUnit.MEGABYTE);
 
     private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = 
"txn.log.checkpoint.lsnthreshold";
     private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 
20); // 64M
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index cff4184..0600a98 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -85,4 +85,23 @@
      */
     public int getNumLogPages();
 
+    /**
+     * Opens a file channel to the log file which contains {@code LSN}.
+     * The start position of the file channel will be at the first LSN of the 
file.
+     *
+     * @param LSN
+     * @return
+     * @throws IOException
+     *             if the log file does not exist.
+     */
+    public TxnLogFile getLogFile(long LSN) throws IOException;
+
+    /**
+     * Closes the log file.
+     *
+     * @param logFileRef
+     * @throws IOException
+     *             if the file reference was already closed.
+     */
+    public void closeLogFile(TxnLogFile logFileRef) throws IOException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
new file mode 100644
index 0000000..9082482
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class TxnLogFile {
+
+    private final FileChannel fileChannel;
+    private final long logFileId;
+    private final long fileBeginLSN;
+    private final ILogManager logManager;
+    private boolean open = true;
+
+    public TxnLogFile(ILogManager logManager, FileChannel fileChannel, long 
logFileId, long fileBeginLSN) {
+        this.logManager = logManager;
+        this.fileChannel = fileChannel;
+        this.logFileId = logFileId;
+        this.fileBeginLSN = fileBeginLSN;
+    }
+
+    public void position(long newPosition) throws IOException {
+        fileChannel.position(newPosition);
+    }
+
+    public long size() throws IOException {
+        return fileChannel.size();
+    }
+
+    public int read(ByteBuffer readBuffer) throws IOException {
+        return fileChannel.read(readBuffer);
+    }
+
+    public long getLogFileId() {
+        return logFileId;
+    }
+
+    public synchronized void close() throws IOException {
+        if (open) {
+            fileChannel.close();
+            logManager.closeLogFile(this);
+            open = false;
+        }
+    }
+
+    public long getFileBeginLSN() {
+        return fileBeginLSN;
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f10520b..f214baa 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +48,7 @@
 import org.apache.asterix.common.transactions.LogManagerProperties;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
 import 
org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -74,6 +76,7 @@
     private final String nodeId;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
     private final FlushLogsLogger flushLogsLogger;
+    private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new 
HashMap<>();
 
     public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
@@ -148,7 +151,13 @@
                         "Aborted job(" + txnCtx.getJobId() + ") tried to write 
non-abort type log record.");
             }
         }
-        if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > 
logFileSize) {
+
+        /**
+         * To eliminate the case where the modulo of the next appendLSN = 0 
(the next
+         * appendLSN = the first LSN of the next log file), we do not allow a 
log to be
+         * written at the last offset of the current file.
+         */
+        if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() >= 
logFileSize) {
             prepareNextLogFile();
             appendPage.isFull(true);
             getAndInitNewPage();
@@ -193,9 +202,26 @@
         flushQ.offer(appendPage);
     }
 
-    protected void prepareNextLogFile() {
+    protected void prepareNextLogFile() throws ACIDException {
+        //wait until all log records have been flushed in the current file
+        synchronized (flushLSN) {
+            while (flushLSN.get() != appendLSN.get()) {
+                //notification will come from LogBuffer.internalFlush(.)
+                try {
+                    flushLSN.wait();
+                } catch (InterruptedException e) {
+                    throw new ACIDException(e);
+                }
+            }
+        }
+        //move appendLSN and flushLSN to the first LSN of the next log file
         appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
+        flushLSN.set(appendLSN.get());
         appendChannel = getFileChannel(appendLSN.get(), true);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Created new txn log file with id(" + 
getLogFileId(appendLSN.get()) + ") starting with LSN = "
+                    + appendLSN.get());
+        }
         appendPage.isLastPage(true);
         //[Notice]
         //the current log file channel is closed if
@@ -323,15 +349,30 @@
 
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
-
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
-            for (Long id : logFileIds) {
-                if (id < checkpointLSNLogFileID) {
-                    File file = new File(getLogFilePath(id));
-                    if (!file.delete()) {
-                        throw new IllegalStateException("Failed to delete a 
file: " + file.getAbsolutePath());
+            /**
+             * At this point, any future LogReader should read from LSN >= 
checkpointLSN
+             */
+            synchronized (txnLogFileId2ReaderCount) {
+                for (Long id : logFileIds) {
+                    if (id < checkpointLSNLogFileID) {
+                        //make sure there is no LogReader currently accessing 
the file.
+                        if (txnLogFileId2ReaderCount.containsKey(id)) {
+                            if (txnLogFileId2ReaderCount.get(id) > 0) {
+                                continue;
+                            }
+                        }
+                        File file = new File(getLogFilePath(id));
+                        if (!file.delete()) {
+                            throw new IllegalStateException("Failed to delete 
a file: " + file.getAbsolutePath());
+                        } else {
+                            txnLogFileId2ReaderCount.remove(id);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Deleted log file " + 
file.getAbsolutePath());
+                            }
+                        }
                     }
                 }
             }
@@ -365,6 +406,7 @@
                 throw new IllegalStateException("Failed to close a fileChannel 
of a log file");
             }
         }
+        txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
             for (Long id : logFileIds) {
@@ -434,7 +476,7 @@
         return (new File(path)).mkdir();
     }
 
-    public FileChannel getFileChannel(long lsn, boolean create) {
+    private FileChannel getFileChannel(long lsn, boolean create) {
         FileChannel newFileChannel = null;
         try {
             long fileId = getLogFileId(lsn);
@@ -496,6 +538,51 @@
         return numLogPages;
     }
 
+    @Override
+    public TxnLogFile getLogFile(long LSN) throws IOException {
+        long fileId = getLogFileId(LSN);
+        String logFilePath = getLogFilePath(fileId);
+        File file = new File(logFilePath);
+        if (!file.exists()) {
+            throw new IOException("Log file with id(" + fileId + ") was not 
found. Requested LSN: " + LSN);
+        }
+        RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), 
"r");
+        FileChannel newFileChannel = raf.getChannel();
+        TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, 
fileId * logFileSize);
+        touchLogFile(fileId);
+        return logFile;
+    }
+
+    @Override
+    public void closeLogFile(TxnLogFile logFileRef) throws IOException {
+        untouchLogFile(logFileRef.getLogFileId());
+    }
+
+    private void touchLogFile(long fileId) {
+        synchronized (txnLogFileId2ReaderCount) {
+            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+                txnLogFileId2ReaderCount.put(fileId, 
txnLogFileId2ReaderCount.get(fileId) + 1);
+            } else {
+                txnLogFileId2ReaderCount.put(fileId, 1);
+            }
+        }
+    }
+
+    private void untouchLogFile(long fileId) {
+        synchronized (txnLogFileId2ReaderCount) {
+            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+                int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
+                if (newReaderCount < 0) {
+                    throw new IllegalStateException(
+                            "Invalid log file reader count (ID=" + fileId + ", 
count: " + newReaderCount + ")");
+                }
+                txnLogFileId2ReaderCount.put(fileId, newReaderCount);
+            } else {
+                throw new IllegalStateException("Trying to close log file id(" 
+ fileId + ") which was not opened.");
+            }
+        }
+    }
+
     /**
      * This class is used to log FLUSH logs.
      * FLUSH logs are flushed on a different thread to avoid a possible 
deadlock in LogBuffer batchUnlock which calls 
PrimaryIndexOpeartionTracker.completeOperation
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 0b1d320..d10e0d5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -20,15 +20,16 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
 
 /**
  * NOTE: Many method calls of this class are not thread safe.
@@ -38,7 +39,7 @@
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(LogReader.class.getName());
-    private final LogManager logMgr;
+    private final ILogManager logMgr;
     private final long logFileSize;
     private final int logPageSize;
     private final MutableLong flushLSN;
@@ -48,14 +49,15 @@
     private long readLSN;
     private long bufferBeginLSN;
     private long fileBeginLSN;
-    private FileChannel fileChannel;
+    private TxnLogFile logFile;
 
     private enum ReturnState {
         FLUSH,
         EOF
     };
 
-    public LogReader(LogManager logMgr, long logFileSize, int logPageSize, 
MutableLong flushLSN, boolean isRecoveryMode) {
+    public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, 
MutableLong flushLSN,
+            boolean isRecoveryMode) {
         this.logMgr = logMgr;
         this.logFileSize = logFileSize;
         this.logPageSize = logPageSize;
@@ -71,12 +73,13 @@
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return;
         }
-        getFileChannel();
+        getLogFile();
         fillLogReadBuffer();
     }
 
     /**
      * Get the next log record from the log file.
+     * 
      * @return A deserialized log record, or null if we have reached the end 
of the file.
      * @throws ACIDException
      */
@@ -119,7 +122,8 @@
                     continue;
                 }
                 case BAD_CHKSUM: {
-                    LOGGER.severe("Transaction log contains corrupt log 
records (perhaps due to medium error). Stopping recovery early.");
+                    LOGGER.severe(
+                            "Transaction log contains corrupt log records 
(perhaps due to medium error). Stopping recovery early.");
                     return null;
                 }
                 case OK:
@@ -136,14 +140,14 @@
 
     private ReturnState waitForFlushOrReturnIfEOF() {
         synchronized (flushLSN) {
-            while (readLSN >= flushLSN.get()) {
+            while (readLSN > flushLSN.get()) {
                 if (isRecoveryMode) {
                     return ReturnState.EOF;
                 }
                 try {
                     if (IS_DEBUG_MODE) {
-                        LOGGER.info("waitForFlushOrReturnIfEOF()| flushLSN: " 
+ flushLSN.get() + ", readLSN: "
-                                + readLSN);
+                        LOGGER.info(
+                                "waitForFlushOrReturnIfEOF()| flushLSN: " + 
flushLSN.get() + ", readLSN: " + readLSN);
                     }
                     flushLSN.wait();
                 } catch (InterruptedException e) {
@@ -156,15 +160,16 @@
 
     /**
      * Continues log analysis between log file splits.
+     * 
      * @return true if log continues, false if EOF
      * @throws ACIDException
      */
     private boolean refillLogReadBuffer() throws ACIDException {
         try {
-            if (readLSN % logFileSize == fileChannel.size()) {
-                fileChannel.close();
+            if (readLSN % logFileSize == logFile.size()) {
+                logFile.close();
                 readLSN += logFileSize - (readLSN % logFileSize);
-                getFileChannel();
+                getLogFile();
             }
             return fillLogReadBuffer();
         } catch (IOException e) {
@@ -174,6 +179,7 @@
 
     /**
      * Fills the log buffer with data from the log file at the current position
+     * 
      * @return false if EOF, true otherwise
      * @throws ACIDException
      */
@@ -183,17 +189,17 @@
     }
 
     private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) 
throws ACIDException {
-        int size=0;
-        int read=0;
+        int size = 0;
+        int read = 0;
         readBuffer.position(0);
         readBuffer.limit(readSize);
         try {
-            fileChannel.position(readLSN % logFileSize);
+            logFile.position(readLSN % logFileSize);
             //We loop here because read() may return 0, but this simply means 
we are waiting on IO.
             //Therefore we want to break out only when either the buffer is 
full, or we reach EOF.
-            while( size < readSize && read != -1) {
-                read = fileChannel.read(readBuffer);
-                if(read>0) {
+            while (size < readSize && read != -1) {
+                read = logFile.read(readBuffer);
+                if (read > 0) {
                     size += read;
                 }
             }
@@ -202,7 +208,7 @@
         }
         readBuffer.position(0);
         readBuffer.limit(size);
-        if(size == 0 && read == -1){
+        if (size == 0 && read == -1) {
             return false; //EOF
         }
         bufferBeginLSN = readLSN;
@@ -213,38 +219,38 @@
     @Override
     public ILogRecord read(long LSN) throws ACIDException {
         readLSN = LSN;
+        //wait for the log to be flushed if needed before trying to read it.
         synchronized (flushLSN) {
-            while (readLSN >= flushLSN.get()) {
+            while (readLSN > flushLSN.get()) {
                 try {
                     flushLSN.wait();
                 } catch (InterruptedException e) {
-                    //ignore
+                    //Txn roll back will not be completed!
+                    return null;
                 }
             }
         }
         try {
-            if (fileChannel == null) {
-                getFileChannel();
+            if (logFile == null) {
+                //get the log file which contains readLSN
+                getLogFile();
                 fillLogReadBuffer();
-            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + 
fileChannel.size()) {
-                fileChannel.close();
-                getFileChannel();
+            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + 
logFile.size()) {
+                //log is not in the current log file
+                logFile.close();
+                getLogFile();
                 fillLogReadBuffer();
             } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + 
readBuffer.limit()) {
+                //log is not in the current read buffer
                 fillLogReadBuffer();
             } else {
+                //log is either completely in the current read buffer or 
truncated
                 readBuffer.position((int) (readLSN - bufferBeginLSN));
             }
         } catch (IOException e) {
             throw new ACIDException(e);
         }
-        boolean hasRemaining;
-        if(readBuffer.position() == readBuffer.limit()){
-            hasRemaining = refillLogReadBuffer();
-            if(!hasRemaining){
-                throw new ACIDException("LSN is out of bounds");
-            }
-        }
+
         ByteBuffer readBuffer = this.readBuffer;
         while (true) {
             RecordReadStatus status = logRecord.readLogRecord(readBuffer);
@@ -256,14 +262,15 @@
                     continue;
                 }
                 case TRUNCATED: {
-                    throw new ACIDException("LSN is out of bounds");
+                    fillLogReadBuffer();
+                    //now read the complete log record
+                    continue;
                 }
                 case BAD_CHKSUM: {
                     throw new ACIDException("Log record has incorrect 
checksum");
                 }
                 case OK:
                     break;
-
             }
             break;
         }
@@ -272,16 +279,20 @@
         return logRecord;
     }
 
-    private void getFileChannel() throws ACIDException {
-        fileChannel = logMgr.getFileChannel(readLSN, false);
-        fileBeginLSN = readLSN;
+    private void getLogFile() throws ACIDException {
+        try {
+            logFile = logMgr.getLogFile(readLSN);
+            fileBeginLSN = logFile.getFileBeginLSN();
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
     }
 
     @Override
     public void close() throws ACIDException {
         try {
-            if (fileChannel != null) {
-                fileChannel.close();
+            if (logFile != null) {
+                logFile.close();
             }
         } catch (IOException e) {
             throw new ACIDException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 3e5c6cf..4e09dfd 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -689,18 +689,34 @@
         int abortedJobId = txnContext.getJobId().getId();
         // Obtain the first/last log record LSNs written by the Job
         long firstLSN = txnContext.getFirstLSN();
+        /**
+         * The effect of any log record with LSN below minFirstLSN has already 
been written to disk and
+         * will not be rolled back. Therefore, we will set the first LSN of 
the job to the maximum of
+         * minFirstLSN and the job's first LSN.
+         */
+        try {
+            long localMinFirstLSN = getLocalMinFirstLSN();
+            firstLSN = Math.max(firstLSN, localMinFirstLSN);
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
         long lastLSN = txnContext.getLastLSN();
-
-        LOGGER.log(Level.INFO, "rollbacking transaction log records from " + 
firstLSN + " to " + lastLSN);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("rollbacking transaction log records from " + firstLSN 
+ " to " + lastLSN);
+        }
         // check if the transaction actually wrote some logs.
-        if (firstLSN == 
TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
-            LOGGER.log(Level.INFO,
-                    "no need to roll back as there were no operations by the 
transaction " + txnContext.getJobId());
+        if (firstLSN == 
TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN >= 
lastLSN) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(
+                        "no need to roll back as there were no operations by 
the transaction " + txnContext.getJobId());
+            }
             return;
         }
 
         // While reading log records from firstLsn to lastLsn, collect 
uncommitted txn's Lsns
-        LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + 
firstLSN + " to " + lastLSN);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("collecting loser transaction's LSNs from " + firstLSN 
+ " to " + lastLSN);
+        }
 
         Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, 
List<Long>>();
         TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
@@ -812,7 +828,6 @@
                 LOGGER.info("[RecoveryManager's rollback log count] 
update/entityCommit/undo:" + updateLogCount + "/"
                         + entityCommitLogCount + "/" + undoCount);
             }
-
         } finally {
             logReader.close();
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/867
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to