abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1719

Change subject: Make LogManager More modular
......................................................................

Make LogManager More modular

The motivation behind this change is to facilitate testing
of log manager and components that interact with log manager
through the overriding of the behaviour of the log buffer.

Change-Id: I9594d381cea9bb21f4ad4841a9357bdb5ba37349
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
9 files changed, 303 insertions(+), 89 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/19/1719/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index ec1a386..0e6cf9a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -36,8 +36,10 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
+import 
org.apache.asterix.transaction.management.service.logging.LogBufferFactory;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import 
org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
+import 
org.apache.asterix.transaction.management.service.logging.ReplicatingLogBufferFactory;
 import 
org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
 import 
org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -68,8 +70,8 @@
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new 
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
-        ReplicationProperties repProperties = 
asterixAppRuntimeContextProvider.getAppContext()
-                .getReplicationProperties();
+        ReplicationProperties repProperties =
+                
asterixAppRuntimeContextProvider.getAppContext().getReplicationProperties();
         IReplicationStrategy replicationStrategy = 
repProperties.getReplicationStrategy();
         final boolean replicationEnabled = repProperties.isParticipant(id);
 
@@ -83,9 +85,10 @@
         }
 
         if (replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this, 
replicationStrategy);
+            this.logManager =
+                    new LogManagerWithReplication(this, 
ReplicatingLogBufferFactory.INSTANCE, replicationStrategy);
         } else {
-            this.logManager = new LogManager(this);
+            this.logManager = new LogManager(this, LogBufferFactory.INSTANCE);
         }
         this.recoveryManager = new RecoveryManager(this, serviceCtx);
 
@@ -183,8 +186,8 @@
             long currentTimeStamp = System.currentTimeMillis();
             long currentEntityCommitCount = 
txnSubsystem.profilerEntityCommitLogCount;
 
-            LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + 
"], AbsoluteTimeStamp["
-                    + currentTimeStamp + "], ActualRelativeTimeStamp[" + 
(currentTimeStamp - startTimeStamp)
+            LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + 
"], AbsoluteTimeStamp[" + currentTimeStamp
+                    + "], ActualRelativeTimeStamp[" + (currentTimeStamp - 
startTimeStamp)
                     + "], ExpectedRelativeTimeStamp[" + 
(reportIntervalInSeconds * reportRound) + "], IIPS["
                     + ((currentEntityCommitCount - lastEntityCommitCount) / 
reportIntervalInSeconds) + "], IPS["
                     + (currentEntityCommitCount / (reportRound * 
reportIntervalInSeconds)) + "]");
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index bfb179d..8e67603 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -18,12 +18,60 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.nio.channels.FileChannel;
+
 public interface ILogBuffer {
 
-    public void append(ILogRecord logRecord, long appendLsn);
+    /**
+     * append a log record
+     *
+     * @param logRecord
+     *            the log record to be appended
+     * @param appendLsn
+     *            the lsn for the record in the log file
+     */
+    void append(ILogRecord logRecord, long appendLsn);
 
-    public void flush();
+    /**
+     * flush content of buffer to disk
+     */
+    void flush();
 
-    public void appendWithReplication(ILogRecord logRecord, long appendLSN);
+    /**
+     * @param logSize
+     * @return true if buffer has enough space to append log of size logSize, 
false otherwise
+     */
+    boolean hasSpace(int logSize);
 
+    /**
+     * Set buffer to be full
+     */
+    void setFull();
+
+    /**
+     * Associate the buffer with a file channel
+     *
+     * @param fileChannel
+     */
+    void setFileChannel(FileChannel fileChannel);
+
+    /**
+     * reset the buffer for re-use
+     */
+    void reset();
+
+    /**
+     * Set current page to be the last page of the associated file
+     */
+    void setLastPage();
+
+    /**
+     * stops the log buffer
+     */
+    void stop();
+
+    /**
+     * @return the default log page size in this buffer
+     */
+    int getLogPageSize();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
new file mode 100644
index 0000000..4a88dcc
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+@FunctionalInterface
+public interface ILogBufferFactory {
+
+    /**
+     * Create a log buffer
+     *
+     * @param txnSubsystem
+     *            the transaction subsystem
+     * @param logPageSize
+     *            the default log page size
+     * @param flushLsn
+     *            a mutable long used to communicate progress
+     * @return a in instance of ILogBuffer
+     */
+    ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, 
MutableLong flushLsn);
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 033cbe4..831bace 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -50,15 +50,15 @@
     private final int logPageSize;
     private final MutableLong flushLSN;
     private final AtomicBoolean full;
-    private int appendOffset;
+    protected int appendOffset;
     private int flushOffset;
-    private final ByteBuffer appendBuffer;
+    protected final ByteBuffer appendBuffer;
     private final ByteBuffer flushBuffer;
     private final ByteBuffer unlockBuffer;
     private boolean isLastPage;
-    private final LinkedBlockingQueue<ILogRecord> syncCommitQ;
-    private final LinkedBlockingQueue<ILogRecord> flushQ;
-    private final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
+    protected final LinkedBlockingQueue<ILogRecord> syncCommitQ;
+    protected final LinkedBlockingQueue<ILogRecord> flushQ;
+    protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
     private FileChannel fileChannel;
     private boolean stop;
     private final JobId reusableJobId;
@@ -112,37 +112,6 @@
     }
 
     @Override
-    public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
-        logRecord.writeLogRecord(appendBuffer);
-
-        if (logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.FLUSH
-                && logRecord.getLogType() != LogType.WAIT) {
-            logRecord.getTxnCtx().setLastLSN(appendLSN);
-        }
-
-        synchronized (this) {
-            appendOffset += logRecord.getLogSize();
-            if (IS_DEBUG_MODE) {
-                LOGGER.info("append()| appendOffset: " + appendOffset);
-            }
-            if (logRecord.getLogSource() == LogSource.LOCAL) {
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT
-                        || logRecord.getLogType() == LogType.WAIT) {
-                    logRecord.isFlushed(false);
-                    syncCommitQ.offer(logRecord);
-                }
-                if (logRecord.getLogType() == LogType.FLUSH) {
-                    logRecord.isFlushed(false);
-                    flushQ.offer(logRecord);
-                }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE
-                    && (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT)) {
-                remoteJobsQ.offer(logRecord);
-            }
-            this.notify();
-        }
-    }
-
     public void setFileChannel(FileChannel fileChannel) {
         this.fileChannel = fileChannel;
     }
@@ -155,19 +124,23 @@
         }
     }
 
-    public synchronized void isFull(boolean full) {
-        this.full.set(full);
+    @Override
+    public synchronized void setFull() {
+        this.full.set(true);
         this.notify();
     }
 
-    public void isLastPage(boolean isLastPage) {
-        this.isLastPage = isLastPage;
+    @Override
+    public void setLastPage() {
+        this.isLastPage = true;
     }
 
+    @Override
     public boolean hasSpace(int logSize) {
         return appendOffset + logSize <= logPageSize;
     }
 
+    @Override
     public void reset() {
         appendBuffer.position(0);
         appendBuffer.limit(logPageSize);
@@ -345,14 +318,12 @@
         }
     }
 
-    public boolean isStop() {
-        return stop;
+    @Override
+    public void stop() {
+        this.stop = true;
     }
 
-    public void isStop(boolean stop) {
-        this.stop = stop;
-    }
-
+    @Override
     public int getLogPageSize() {
         return logPageSize;
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
new file mode 100644
index 0000000..27fdfd1
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.logging;
+
+import org.apache.asterix.common.transactions.ILogBuffer;
+import org.apache.asterix.common.transactions.ILogBufferFactory;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.MutableLong;
+
+public class LogBufferFactory implements ILogBufferFactory {
+    public static final LogBufferFactory INSTANCE = new LogBufferFactory();
+
+    private LogBufferFactory() {
+    }
+
+    @Override
+    public ILogBuffer create(ITransactionSubsystem txnSubsystem, int 
logPageSize, MutableLong flushLsn) {
+        return new LogBuffer(txnSubsystem, logPageSize, flushLsn);
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index d0f49db..d3e6baf 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -40,6 +40,8 @@
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.ILogBuffer;
+import org.apache.asterix.common.transactions.ILogBufferFactory;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -54,36 +56,46 @@
 
 public class LogManager implements ILogManager, ILifeCycleComponent {
 
-    public static final boolean IS_DEBUG_MODE = false;// true
+    /*
+     * Constants
+     */
     private static final Logger LOGGER = 
Logger.getLogger(LogManager.class.getName());
+    private static final long SMALLEST_LOG_FILE_ID = 0;
+    private static final int INITIAL_LOG_SIZE = 0;
+    public static final boolean IS_DEBUG_MODE = false;// true
+    /*
+     * Finals
+     */
     private final ITransactionSubsystem txnSubsystem;
-
+    private final ILogBufferFactory logBufferFactory;
     private final LogManagerProperties logManagerProperties;
-    protected final long logFileSize;
-    protected final int logPageSize;
     private final int numLogPages;
     private final String logDir;
     private final String logFilePrefix;
     private final MutableLong flushLSN;
-    private LinkedBlockingQueue<LogBuffer> emptyQ;
-    private LinkedBlockingQueue<LogBuffer> flushQ;
-    private LinkedBlockingQueue<LogBuffer> stashQ;
-    protected final AtomicLong appendLSN;
-    private FileChannel appendChannel;
-    protected LogBuffer appendPage;
-    private LogFlusher logFlusher;
-    private Future<? extends Object> futureLogFlusher;
-    private static final long SMALLEST_LOG_FILE_ID = 0;
-    private static final int INITIAL_LOG_SIZE = 0;
     private final String nodeId;
-    protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
     private final FlushLogsLogger flushLogsLogger;
     private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new 
HashMap<>();
+    protected final long logFileSize;
+    protected final int logPageSize;
+    protected final AtomicLong appendLSN;
+    /*
+     * Mutables
+     */
+    private LinkedBlockingQueue<ILogBuffer> emptyQ;
+    private LinkedBlockingQueue<ILogBuffer> flushQ;
+    private LinkedBlockingQueue<ILogBuffer> stashQ;
+    private FileChannel appendChannel;
+    protected ILogBuffer appendPage;
+    private LogFlusher logFlusher;
+    private Future<? extends Object> futureLogFlusher;
+    protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
 
-    public LogManager(ITransactionSubsystem txnSubsystem) {
+    public LogManager(ITransactionSubsystem txnSubsystem, ILogBufferFactory 
logBufferFactory) {
         this.txnSubsystem = txnSubsystem;
-        logManagerProperties = new 
LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
-                this.txnSubsystem.getId());
+        this.logBufferFactory = logBufferFactory;
+        logManagerProperties =
+                new 
LogManagerProperties(this.txnSubsystem.getTransactionProperties(), 
this.txnSubsystem.getId());
         logFileSize = logManagerProperties.getLogPartitionSize();
         logPageSize = logManagerProperties.getLogPageSize();
         numLogPages = logManagerProperties.getNumLogPages();
@@ -102,7 +114,7 @@
         flushQ = new LinkedBlockingQueue<>(numLogPages);
         stashQ = new LinkedBlockingQueue<>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
+            emptyQ.offer(logBufferFactory.create(txnSubsystem, logPageSize, 
flushLSN));
         }
         appendLSN.set(initializeLogAnchor(nextLogFileId));
         flushLSN.set(appendLSN.get());
@@ -178,7 +190,7 @@
     }
 
     protected void prepareNextPage(int logSize) {
-        appendPage.isFull(true);
+        appendPage.setFull();
         getAndInitNewPage(logSize);
     }
 
@@ -196,7 +208,7 @@
             }
             // for now, alloc a new buffer for each large page
             // TODO: pool large pages??
-            appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
+            appendPage = logBufferFactory.create(txnSubsystem, logSize, 
flushLSN);
             appendPage.setFileChannel(appendChannel);
             flushQ.offer(appendPage);
         } else {
@@ -216,9 +228,9 @@
 
     protected void prepareNextLogFile() {
         // Mark the page as the last page so that it will close the output 
file channel.
-        appendPage.isLastPage(true);
+        appendPage.setLastPage();
         // Make sure to flush whatever left in the log tail.
-        appendPage.isFull(true);
+        appendPage.setFull();
         //wait until all log records have been flushed in the current file
         synchronized (flushLSN) {
             try {
@@ -627,17 +639,18 @@
 
 class LogFlusher implements Callable<Boolean> {
     private static final Logger LOGGER = 
Logger.getLogger(LogFlusher.class.getName());
-    private final static LogBuffer POISON_PILL = new LogBuffer(null, 
ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
+    private static final ILogBuffer POISON_PILL =
+            LogBufferFactory.INSTANCE.create(null, 
ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
     private final LogManager logMgr;//for debugging
-    private final LinkedBlockingQueue<LogBuffer> emptyQ;
-    private final LinkedBlockingQueue<LogBuffer> flushQ;
-    private final LinkedBlockingQueue<LogBuffer> stashQ;
-    private LogBuffer flushPage;
+    private final LinkedBlockingQueue<ILogBuffer> emptyQ;
+    private final LinkedBlockingQueue<ILogBuffer> flushQ;
+    private final LinkedBlockingQueue<ILogBuffer> stashQ;
+    private ILogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> 
emptyQ, LinkedBlockingQueue<LogBuffer> flushQ,
-            LinkedBlockingQueue<LogBuffer> stashQ) {
+    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> 
emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
+            LinkedBlockingQueue<ILogBuffer> stashQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
@@ -663,7 +676,7 @@
         terminateFlag.set(true);
         if (flushPage != null) {
             synchronized (flushPage) {
-                flushPage.isStop(true);
+                flushPage.stop();
                 flushPage.notify();
             }
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index e3ad3dd..f86eea5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ILogBufferFactory;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -37,8 +38,9 @@
     private final IReplicationStrategy replicationStrategy;
     private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
 
-    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, 
IReplicationStrategy replicationStrategy) {
-        super(txnSubsystem);
+    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, 
ILogBufferFactory logBufferFactory,
+            IReplicationStrategy replicationStrategy) {
+        super(txnSubsystem, logBufferFactory);
         this.replicationStrategy = replicationStrategy;
     }
 
@@ -132,7 +134,7 @@
         } else if (!appendPage.hasSpace(logRecordSize)) {
             prepareNextPage(logRecordSize);
         }
-        appendPage.appendWithReplication(logRecord, appendLSN.get());
+        appendPage.append(logRecord, appendLSN.get());
 
         if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
new file mode 100644
index 0000000..0eceb2e
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.logging;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.MutableLong;
+
+public class ReplicatingLogBuffer extends LogBuffer {
+    private static final Logger LOGGER = 
Logger.getLogger(ReplicatingLogBuffer.class.getName());
+
+    public ReplicatingLogBuffer(ITransactionSubsystem txnSubsystem, int 
logPageSize, MutableLong flushLsn) {
+        super(txnSubsystem, logPageSize, flushLsn);
+    }
+
+    @Override
+    public void append(ILogRecord logRecord, long appendLsn) {
+        logRecord.writeLogRecord(appendBuffer);
+
+        if (logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT) {
+            logRecord.getTxnCtx().setLastLSN(appendLsn);
+        }
+
+        synchronized (this) {
+            appendOffset += logRecord.getLogSize();
+            if (IS_DEBUG_MODE) {
+                LOGGER.info("append()| appendOffset: " + appendOffset);
+            }
+            if (logRecord.getLogSource() == LogSource.LOCAL) {
+                if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT
+                        || logRecord.getLogType() == LogType.WAIT) {
+                    logRecord.isFlushed(false);
+                    syncCommitQ.offer(logRecord);
+                }
+                if (logRecord.getLogType() == LogType.FLUSH) {
+                    logRecord.isFlushed(false);
+                    flushQ.offer(logRecord);
+                }
+            } else if (logRecord.getLogSource() == LogSource.REMOTE
+                    && (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT)) {
+                remoteJobsQ.offer(logRecord);
+            }
+            this.notify();
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
new file mode 100644
index 0000000..2abc474
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.logging;
+
+import org.apache.asterix.common.transactions.ILogBuffer;
+import org.apache.asterix.common.transactions.ILogBufferFactory;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.MutableLong;
+
+public class ReplicatingLogBufferFactory implements ILogBufferFactory {
+    public static final ReplicatingLogBufferFactory INSTANCE = new 
ReplicatingLogBufferFactory();
+
+    private ReplicatingLogBufferFactory() {
+
+    }
+
+    @Override
+    public ILogBuffer create(ITransactionSubsystem txnSubsystem, int 
logPageSize, MutableLong flushLsn) {
+        return new ReplicatingLogBuffer(txnSubsystem, logPageSize, flushLsn);
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1719
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9594d381cea9bb21f4ad4841a9357bdb5ba37349
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to