abdullah alamoudi has submitted this change and it was merged.

Change subject: Optionally log image before when before image found in memory 
component
......................................................................


Optionally log image before when before image found in memory component

In addition, this change fixes an issue with one of the test cases for
FrameSpiller.

Change-Id: Iaaed48f4c2ca8d83253e81cd7c60aad998b67b1e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/900
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.b...@couchbase.com>
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M asterixdb/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
D 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
18 files changed, 250 insertions(+), 192 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified



diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 45e3b06..507a393 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -132,7 +132,7 @@
     /**
      * Constructor which wraps an IApplicationConfig.
      */
-    public AsterixPropertiesAccessor (IApplicationConfig cfg) {
+    public AsterixPropertiesAccessor(IApplicationConfig cfg) {
         this.cfg = cfg;
         instanceName = cfg.getString("asterix", "instance", 
"DEFAULT_INSTANCE");
         String mdNode = null;
@@ -234,7 +234,8 @@
             return interpreter.interpret(value);
         } catch (IllegalArgumentException e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
-                StringBuilder msg = new StringBuilder("Invalid property value 
'" + value + "' for property '" + property + "'.\n");
+                StringBuilder msg =
+                        new StringBuilder("Invalid property value '" + value + 
"' for property '" + property + "'.\n");
                 if (p != null) {
                     msg.append("See the description: \n" + p.getDescription() 
+ "\n");
                 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 5445b11..9a76b40 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -48,7 +48,7 @@
     /**
      * The following three variables are used to keep track of the information 
regarding flushing partial frame such as
      * 1. whether there was a partial frame flush for the current frame,
-     * ==> captured in flushedPartialTuples variabl
+     * ==> captured in flushedPartialTuples variable
      * 2. the last flushed tuple index in the frame if there was a partial 
frame flush,
      * ==> captured in lastFlushedTupleIdx variable
      * 3. the current tuple index the frame, where this operator is working on 
the current tuple.
@@ -89,8 +89,8 @@
                 tupleFilter = 
tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
                 frameTuple = new FrameTupleReference();
             }
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) 
ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, 
runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Throwable th) {
             throw new HyracksDataException(th);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 1992a00..cd05ba0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -78,8 +78,6 @@
 
     public void setNewOp(byte newOp);
 
-    public int getNewValueSize();
-
     public void setNewValueSize(int newValueSize);
 
     public ITupleReference getNewValue();
@@ -134,4 +132,10 @@
     public boolean isReplicated();
 
     public void writeRemoteLogRecord(ByteBuffer buffer);
+
+    public ITupleReference getOldValue();
+
+    public void setOldValue(ITupleReference oldValue);
+
+    public void setOldValueSize(int oldValueSize);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 4823a92..23fdd0f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -100,10 +100,13 @@
     private long resourceId;
     private int resourcePartition;
     private int logSize;
-    private int fieldCnt;
+    private int newValueFieldCount;
     private byte newOp;
     private int newValueSize;
     private ITupleReference newValue;
+    private int oldValueSize;
+    private ITupleReference oldValue;
+    private int oldValueFieldCount;
     private long checksum;
     // ------------- fields in a log record (end) --------------//
 
@@ -111,9 +114,9 @@
     private ITransactionContext txnCtx;
     private long LSN;
     private final AtomicBoolean isFlushed;
-    private final SimpleTupleWriter tupleWriter;
     private final PrimaryKeyTupleReference readPKValue;
     private final SimpleTupleReference readNewValue;
+    private final SimpleTupleReference readOldValue;
     private final CRC32 checksumGen;
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
@@ -128,9 +131,9 @@
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
-        tupleWriter = new SimpleTupleWriter();
         readPKValue = new PrimaryKeyTupleReference();
-        readNewValue = (SimpleTupleReference) 
tupleWriter.createTupleReference();
+        readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
+        readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference();
         checksumGen = new CRC32();
         logSource = LogSource.LOCAL;
     }
@@ -152,10 +155,15 @@
         if (logType == LogType.UPDATE) {
             buffer.putLong(resourceId);
             buffer.putInt(logSize);
-            buffer.putInt(fieldCnt);
+            buffer.putInt(newValueFieldCount);
             buffer.put(newOp);
             buffer.putInt(newValueSize);
             writeTuple(buffer, newValue, newValueSize);
+            if (oldValueSize > 0) {
+                buffer.putInt(oldValueSize);
+                buffer.putInt(oldValueFieldCount);
+                writeTuple(buffer, oldValue, oldValueSize);
+            }
         }
         if (logType == LogType.FLUSH) {
             buffer.putInt(datasetId);
@@ -195,7 +203,7 @@
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int 
size) {
         if (logSource == LogSource.LOCAL) {
-            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+            SimpleTupleWriter.INSTANCE.writeTuple(tuple, buffer.array(), 
buffer.position());
         } else {
             //since the tuple is already serialized in remote logs, just copy 
it from beginning to end.
             System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), 
buffer.position(), size);
@@ -241,62 +249,91 @@
         logSource = buffer.get();
         logType = buffer.get();
         jobId = buffer.getInt();
-
-        if (logType == LogType.FLUSH) {
-            if (buffer.remaining() < DatasetId.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            datasetId = buffer.getInt();
-            resourceId = 0l;
-            computeAndSetLogSize();
-        } else if (logType == LogType.WAIT) {
-            computeAndSetLogSize();
-        } else {
-            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+        switch (logType) {
+            case LogType.FLUSH:
+                if (buffer.remaining() < DatasetId.BYTES) {
+                    return RecordReadStatus.TRUNCATED;
+                }
+                datasetId = buffer.getInt();
+                resourceId = 0L;
+                break;
+            case LogType.ABORT:
+            case LogType.JOB_COMMIT:
                 datasetId = -1;
                 PKHashValue = -1;
-            } else {
-                //attempt to read in the resourcePartition, dsid, PK hash and 
PK length
-                if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+                break;
+            case LogType.ENTITY_COMMIT:
+            case LogType.UPSERT_ENTITY_COMMIT:
+                if (!readEntityInfo(buffer)) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                resourcePartition = buffer.getInt();
-                datasetId = buffer.getInt();
-                PKHashValue = buffer.getInt();
-                PKValueSize = buffer.getInt();
-                // attempt to read in the PK
-                if (buffer.remaining() < PKValueSize) {
+                break;
+            case LogType.UPDATE:
+                if (!readEntityInfo(buffer)) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                if (PKValueSize <= 0) {
-                    throw new IllegalStateException("Primary Key Size is less 
than or equal to 0");
-                }
-                PKValue = readPKValue(buffer);
-            }
-
-            if (logType == LogType.UPDATE) {
-                // attempt to read in the previous LSN, log size, new value 
size, and new record type
                 if (buffer.remaining() < UPDATE_LSN_HEADER + 
UPDATE_BODY_HEADER) {
                     return RecordReadStatus.TRUNCATED;
                 }
                 resourceId = buffer.getLong();
                 logSize = buffer.getInt();
-                fieldCnt = buffer.getInt();
+                newValueFieldCount = buffer.getInt();
                 newOp = buffer.get();
                 newValueSize = buffer.getInt();
-                if (buffer.remaining() < newValueSize) {
-                    if (logSize > buffer.capacity()) {
-                        return RecordReadStatus.LARGE_RECORD;
-                    }
-                    return RecordReadStatus.TRUNCATED;
-                }
-                newValue = readTuple(buffer, readNewValue, fieldCnt, 
newValueSize);
-            } else {
-                computeAndSetLogSize();
-            }
+                return readEntity(buffer);
+            default:
+                break;
         }
-
+        computeAndSetLogSize();
         return RecordReadStatus.OK;
+    }
+
+    private RecordReadStatus readEntity(ByteBuffer buffer) {
+        if (buffer.remaining() < newValueSize) {
+            if (logSize > buffer.capacity()) {
+                return RecordReadStatus.LARGE_RECORD;
+            }
+            return RecordReadStatus.TRUNCATED;
+        }
+        newValue = readTuple(buffer, readNewValue, newValueFieldCount, 
newValueSize);
+        if (logSize > getUpdateLogSizeWithoutOldValue()) {
+            // Prev Image exists
+            if (buffer.remaining() < Integer.BYTES) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValueSize = buffer.getInt();
+            if (buffer.remaining() < Integer.BYTES) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValueFieldCount = buffer.getInt();
+            if (buffer.remaining() < oldValueSize) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, 
oldValueSize);
+        } else {
+            oldValueSize = 0;
+        }
+        return RecordReadStatus.OK;
+    }
+
+    private boolean readEntityInfo(ByteBuffer buffer) {
+        //attempt to read in the resourcePartition, dsid, PK hash and PK length
+        if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+            return false;
+        }
+        resourcePartition = buffer.getInt();
+        datasetId = buffer.getInt();
+        PKHashValue = buffer.getInt();
+        PKValueSize = buffer.getInt();
+        // attempt to read in the PK
+        if (buffer.remaining() < PKValueSize) {
+            return false;
+        }
+        if (PKValueSize <= 0) {
+            throw new IllegalStateException("Primary Key Size is less than or 
equal to 0");
+        }
+        PKValue = readPKValue(buffer);
+        return true;
     }
 
     @Override
@@ -345,7 +382,14 @@
     }
 
     private void setUpdateLogSize() {
-        logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
+        logSize = getUpdateLogSizeWithoutOldValue();
+        if (oldValueSize > 0) {
+            logSize += /*size*/Integer.BYTES + /*fieldCount*/Integer.BYTES + 
/*tuple*/oldValueSize;
+        }
+    }
+
+    private int getUpdateLogSizeWithoutOldValue() {
+        return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
     }
 
     @Override
@@ -504,11 +548,6 @@
     }
 
     @Override
-    public int getNewValueSize() {
-        return newValueSize;
-    }
-
-    @Override
     public void setNewValueSize(int newValueSize) {
         this.newValueSize = newValueSize;
     }
@@ -521,7 +560,7 @@
     @Override
     public void setNewValue(ITupleReference newValue) {
         this.newValue = newValue;
-        this.fieldCnt = newValue.getFieldCount();
+        this.newValueFieldCount = newValue.getFieldCount();
     }
 
     @Override
@@ -633,4 +672,20 @@
     public boolean isReplicated() {
         return replicated;
     }
+
+    @Override
+    public ITupleReference getOldValue() {
+        return oldValue;
+    }
+
+    @Override
+    public void setOldValue(ITupleReference oldValue) {
+        this.oldValue = oldValue;
+        this.oldValueFieldCount = oldValue.getFieldCount();
+    }
+
+    @Override
+    public void setOldValueSize(int oldValueSize) {
+        this.oldValueSize = oldValueSize;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 3becd96..bc1c328 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -679,47 +679,50 @@
      */
     @org.junit.Test
     public void testMemoryVarSizeFrameWithSpillNoDiscard() {
-        try {
-            Random random = new Random();
-            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
-            // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * 
NUM_FRAMES, DISCARD_ALLOWANCE);
-            // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
-            writer.freeze();
-            // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
-            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
-            handler.open();
-            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
-            int multiplier = 1;
-            // add NUM_FRAMES times
-            while ((multiplier <= framePool.remaining())) {
-                handler.nextFrame(buffer);
-                multiplier = random.nextInt(10) + 1;
-                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+        for (int k = 0; k < 1000; k++) {
+            try {
+                Random random = new Random();
+                IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+                // Spill budget = Memory budget, No discard
+                FeedPolicyAccessor fpa =
+                        createFeedPolicyAccessor(true, false, 
DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+                // Non-Active Writer
+                TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
+                writer.freeze();
+                // FramePool
+                ConcurrentFramePool framePool = new 
ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+                FeedRuntimeInputHandler handler = createInputHandler(ctx, 
writer, fpa, framePool);
+                handler.open();
+                ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+                int multiplier = 1;
+                // add NUM_FRAMES times
+                while ((multiplier <= framePool.remaining())) {
+                    handler.nextFrame(buffer);
+                    multiplier = random.nextInt(10) + 1;
+                    buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 
multiplier);
+                }
+                // Next call should Not block. we will do it in a different 
thread
+                Future<?> result = EXECUTOR.submit(new Pusher(buffer, 
handler));
+                result.get();
+                // Check that no records were discarded
+                assertEquals(handler.getNumDiscarded(), 0);
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 1);
+                int numOfBuffersInMemory = handler.getInternalBuffer().size();
+                // consume memory frames
+                while (numOfBuffersInMemory > 0) {
+                    writer.kick();
+                    numOfBuffersInMemory--;
+                }
+                // There should be 1 frame on disk
+                Assert.assertEquals(1, handler.framesOnDisk());
+                writer.unfreeze();
+                handler.close();
+                Assert.assertEquals(0, handler.framesOnDisk());
+            } catch (Throwable th) {
+                th.printStackTrace();
+                Assert.fail();
             }
-            // Next call should Not block. we will do it in a different thread
-            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
-            result.get();
-            // Check that no records were discarded
-            assertEquals(handler.getNumDiscarded(), 0);
-            // Check that one frame is spilled
-            assertEquals(handler.getNumSpilled(), 1);
-            // consume memory frames
-            while (!handler.getInternalBuffer().isEmpty()) {
-                writer.kick();
-            }
-            // There should be 1 frame on disk
-            Assert.assertEquals(1, handler.framesOnDisk());
-            writer.unfreeze();
-            result.get();
-            handler.close();
-            Assert.assertEquals(0, handler.framesOnDisk());
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail();
         }
         Assert.assertNull(cause);
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index b82c9ee..3b432a6 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -315,7 +315,7 @@
         // locks and secondary index doesn't.
         return new 
SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
                 metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, 
ResourceType.LSM_BTREE, indexOp);
+                transactionSubsystem, resourceId, metadataStoragePartition, 
ResourceType.LSM_BTREE, indexOp, false);
     }
 
     @Override
@@ -996,10 +996,8 @@
             try {
                 while (rangeCursor.hasNext()) {
                     rangeCursor.next();
-                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
-                            new ISerializerDeserializer[] {
-                                    
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
-                                            BuiltinType.ASTRING),
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), 
new ISerializerDeserializer[] {
+                            
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             AqlSerializerDeserializerProvider.INSTANCE
                                     
.getSerializerDeserializer(BuiltinType.ASTRING) }));
@@ -1016,7 +1014,7 @@
 
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, 
ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> 
results)
-                    throws MetadataException, IndexException, IOException {
+            throws MetadataException, IndexException, IOException {
         IBinaryComparatorFactory[] comparatorFactories = 
index.getKeyBinaryComparatorFactory();
         String resourceName = index.getFile().toString();
         IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 28f8a79..323ea6d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -824,8 +824,8 @@
 
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = 
getMergedComparatorFactories(
-                        comparatorFactories, primaryComparatorFactories);
+                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                        getMergedComparatorFactories(comparatorFactories, 
primaryComparatorFactories);
                 IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                         valueProviderFactories, RTreePolicyType.RTREE, 
deletedKeyBTreeCompFactories,
                         new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -1135,7 +1135,7 @@
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE)
                     : new 
PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields,
-                            txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE);
+                            txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1614,8 +1614,8 @@
                     ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE);
+                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1819,7 +1819,7 @@
                             ResourceType.LSM_INVERTED_INDEX)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX);
+                            ResourceType.LSM_INVERTED_INDEX, 
dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1889,8 +1889,8 @@
             Pair<IAType, Boolean> keyPairType =
                     
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR =
+                    spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -1969,14 +1969,14 @@
                     ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE);
+                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
 
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = 
getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                    getMergedComparatorFactories(comparatorFactories, 
primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, 
deletedKeyBTreeCompFactories,
                     new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2300,7 +2300,7 @@
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, 
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
                     : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE);
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, 
dataset.hasMetaPart());
 
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
@@ -2601,7 +2601,7 @@
                             ResourceType.LSM_INVERTED_INDEX)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_INVERTED_INDEX);
+                            ResourceType.LSM_INVERTED_INDEX, 
dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -2666,8 +2666,8 @@
                     
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
 
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR =
+                    spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -2770,12 +2770,12 @@
                             ResourceType.LSM_RTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_RTREE);
+                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = 
getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                    getMergedComparatorFactories(comparatorFactories, 
primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, 
deletedKeyBTreeCompFactories,
                     new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2922,7 +2922,7 @@
                             ResourceType.LSM_BTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_BTREE);
+                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 65c9a49..2a3467e 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -40,7 +40,6 @@
     protected final byte resourceType;
     protected final IndexOperation indexOp;
     protected final ITransactionSubsystem txnSubsystem;
-    protected final SimpleTupleWriter tupleWriter;
     protected final ILogRecord logRecord;
 
     protected AbstractIndexModificationOperationCallback(int datasetId, int[] 
primaryKeyFields,
@@ -51,7 +50,6 @@
         this.resourceType = resourceType;
         this.indexOp = indexOp;
         this.txnSubsystem = txnSubsystem;
-        tupleWriter = new SimpleTupleWriter();
         logRecord = new LogRecord();
         logRecord.setTxnCtx(txnCtx);
         logRecord.setLogType(LogType.UPDATE);
@@ -62,17 +60,23 @@
         logRecord.setNewOp((byte) (indexOp.ordinal()));
     }
 
-    protected void log(int PKHash, ITupleReference newValue) throws 
ACIDException {
+    protected void log(int PKHash, ITupleReference newValue, ITupleReference 
oldValue) throws ACIDException {
         logRecord.setPKHashValue(PKHash);
         logRecord.setPKFields(primaryKeyFields);
         logRecord.setPKValue(newValue);
         logRecord.computeAndSetPKValueSize();
         if (newValue != null) {
-            logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
+            
logRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
             logRecord.setNewValue(newValue);
         } else {
             logRecord.setNewValueSize(0);
         }
+        if (oldValue != null) {
+            
logRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue));
+            logRecord.setOldValue(oldValue);
+        } else {
+            logRecord.setOldValueSize(0);
+        }
         logRecord.computeAndSetLogSize();
         txnSubsystem.getLogManager().log(logRecord);
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 4bde490..5b89bb5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -40,13 +40,16 @@
         implements IModificationOperationCallback {
 
     private final AsterixLSMInsertDeleteOperatorNodePushable 
operatorNodePushable;
+    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] 
primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long 
resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, IOperatorNodePushable 
operatorNodePushable) {
+            byte resourceType, IndexOperation indexOp, IOperatorNodePushable 
operatorNodePushable,
+            boolean logBeforeImage) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
         this.operatorNodePushable = 
(AsterixLSMInsertDeleteOperatorNodePushable) operatorNodePushable;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -99,7 +102,11 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after);
+            if (logBeforeImage) {
+                log(pkHash, after, before);
+            } else {
+                log(pkHash, after, null);
+            }
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index c406812..52e2818 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -43,11 +43,14 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -56,8 +59,8 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IIndexLifecycleManager indexLifeCycleManager =
+                
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) 
indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is 
not registered.");
@@ -67,7 +70,7 @@
             ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resourceId,
-                    resourcePartition, resourceType, indexOp, 
operatorNodePushable);
+                    resourcePartition, resourceType, indexOp, 
operatorNodePushable, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, 
(AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 8044d90..974e631 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -37,13 +37,15 @@
         implements IModificationOperationCallback {
 
     protected final IndexOperation oldOp;
+    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallback(int datasetId, int[] 
primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, 
ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+            int resourcePartition, byte resourceType, IndexOperation indexOp, 
boolean logBeforeImage) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
         oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : 
IndexOperation.DELETE;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -55,7 +57,7 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            this.log(pkHash, after);
+            this.log(pkHash, after, logBeforeImage ? before : null);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 168da99..c6743dd 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -40,11 +40,14 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -63,7 +66,7 @@
             ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resourceId,
-                    resourcePartition, resourceType, indexOp);
+                    resourcePartition, resourceType, indexOp, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, 
(AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index f98083a..13d2d57 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -29,12 +29,14 @@
 
 public class UpsertOperationCallback extends 
AbstractIndexModificationOperationCallback
         implements IModificationOperationCallback {
+    private final boolean logBeforeImage;
 
     public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, 
ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long 
resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp) {
+            byte resourceType, IndexOperation indexOp, boolean logBeforeImage) 
{
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -46,7 +48,7 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after);
+            log(pkHash, after, logBeforeImage ? before : null);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 87cb8e7..5bf1505 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -39,11 +39,14 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] 
primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -52,8 +55,8 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IIndexLifecycleManager indexLifeCycleManager =
+                
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) 
indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is 
not registered.");
@@ -61,9 +64,9 @@
 
         try {
             ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            IModificationOperationCallback modCallback = new 
UpsertOperationCallback(datasetId, primaryKeyFields,
-                    txnCtx, txnSubsystem.getLockManager(), txnSubsystem, 
resourceId, resourcePartition, resourceType,
-                    indexOp);
+            IModificationOperationCallback modCallback =
+                    new UpsertOperationCallback(datasetId, primaryKeyFields, 
txnCtx, txnSubsystem.getLockManager(),
+                            txnSubsystem, resourceId, resourcePartition, 
resourceType, indexOp, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, 
(AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index f474690..7f5c612 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -199,7 +199,7 @@
                   <pluginExecutionFilter>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-plugin-plugin</artifactId>
-                    <versionRange>[3.4,)</versionRange>
+                    <versionRange>[3.3,)</versionRange>
                     <goals>
                       <goal>descriptor</goal>
                     </goals>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
index 11b0010..7aaa983 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
@@ -22,10 +22,17 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 
+/*
+ * This class should be replaced by a Util class
+ */
 public class SimpleTupleWriter implements ITreeIndexTupleWriter {
+
+    public static final SimpleTupleWriter INSTANCE = new SimpleTupleWriter();
+
+    private SimpleTupleWriter() {
+    }
 
     // Write short in little endian to target byte array at given offset.
     private static void writeShortL(short s, byte[] buf, int targetOff) {
@@ -52,7 +59,7 @@
     }
 
     @Override
-    public ITreeIndexTupleReference createTupleReference() {
+    public SimpleTupleReference createTupleReference() {
         return new SimpleTupleReference();
     }
 
@@ -103,7 +110,7 @@
     }
 
     protected int getNullFlagsBytes(ITupleReference tuple) {
-        return (int) Math.ceil((double) tuple.getFieldCount() / 8.0);
+        return (int) Math.ceil(tuple.getFieldCount() / 8.0);
     }
 
     protected int getFieldSlotsBytes(ITupleReference tuple) {
@@ -111,7 +118,7 @@
     }
 
     protected int getNullFlagsBytes(ITupleReference tuple, int startField, int 
numFields) {
-        return (int) Math.ceil((double) numFields / 8.0);
+        return (int) Math.ceil(numFields / 8.0);
     }
 
     protected int getFieldSlotsBytes(ITupleReference tuple, int startField, 
int numFields) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
deleted file mode 100644
index be0688a..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.common.tuples;
-
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
-
-public class SimpleTupleWriterFactory implements ITreeIndexTupleWriterFactory {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public ITreeIndexTupleWriter createTupleWriter() {
-               return new SimpleTupleWriter();
-       }
-
-}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b58cc29..8ddab88 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -511,8 +511,8 @@
 
     protected void triggerReplication(List<ILSMComponent> lsmComponents, 
boolean bulkload, LSMOperationType opType)
             throws HyracksDataException {
-        ILSMIndexAccessorInternal accessor = 
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
+        ILSMIndexAccessorInternal accessor =
+                lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
         accessor.scheduleReplication(lsmComponents, bulkload, opType);
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/900
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Iaaed48f4c2ca8d83253e81cd7c60aad998b67b1e
Gerrit-PatchSet: 8
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <michael.b...@couchbase.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to