abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1554

Change subject: Fix transaction logs and optimize upserts
......................................................................

Fix transaction logs and optimize upserts

Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M asterixdb/asterix-transactions/pom.xml
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
D 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
36 files changed, 118 insertions(+), 219 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/54/1554/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index d0cee55..20c69c4 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -46,15 +46,12 @@
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
     private final Dataset dataset;
-    private final LogicalVariable upsertVar;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> 
primaryKeyLogicalVars,
-            LogicalVariable upsertVar, boolean isSink) {
+    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> 
primaryKeyLogicalVars, boolean isSink) {
         this.jobId = jobId;
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
         this.isSink = isSink;
     }
 
@@ -98,12 +95,8 @@
         for (int i = 0; i < splitsForDataset.length; i++) {
             datasetPartitions[i] = i;
         }
-        int upsertVarIdx = -1;
-        if (upsertVar != null) {
-            upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
-        }
         IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, 
primaryKeyFields, metadataProvider,
-                upsertVarIdx, datasetPartitions, isSink);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 9b442ae..47a37d1 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -140,7 +140,7 @@
         //create the logical and physical operator
         CommitOperator commitOperator = new 
CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, 
upsertVar, isSink);
+                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, 
isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4ee1122..b2e8640 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -98,8 +98,9 @@
         this.appCtx = appCtx;
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        ReplicationProperties repProperties = ((IPropertiesProvider) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext()).getReplicationProperties();
+        ReplicationProperties repProperties =
+                ((IPropertiesProvider) 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext())
+                        .getReplicationProperties();
         replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
         localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
@@ -240,7 +241,6 @@
                     jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
                     if (partitions.contains(logRecord.getResourcePartition())) 
{
                         analyzeEntityCommitLog(logRecord);
                         entityCommitLogCount++;
@@ -406,7 +406,6 @@
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
                     case LogType.FLUSH:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                     case LogType.WAIT:
                     case LogType.MARKER:
                         //do nothing
@@ -599,13 +598,12 @@
                         }
                         break;
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         if 
(activePartitions.contains(logRecord.getResourcePartition())) {
                             jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
-                                LOGGER.info(Thread.currentThread().getId() + 
"======> entity_commit[" + currentLSN
-                                        + "]" + tempKeyTxnId);
+                                LOGGER.info(Thread.currentThread().getId() + 
"======> entity_commit[" + currentLSN + "]"
+                                        + tempKeyTxnId);
                             }
                         }
                         break;
@@ -687,10 +685,13 @@
                     (ILSMIndex) 
datasetLifecycleManager.getIndex(logRecord.getDatasetId(), 
logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == 
IndexOperation.DELETE.ordinal()) {
+            } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) {
+                // undo, upsert the old value
+                indexAccessor.forceUpsert(logRecord.getOldValue());
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " 
+ logRecord.getNewOp());
             }
@@ -706,10 +707,13 @@
             ILSMIndex index = (ILSMIndex) 
datasetLifecycleManager.getIndex(datasetId, resourceId);
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == 
IndexOperation.DELETE.ordinal()) {
+            } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) {
+                // redo, upsert the new value
+                indexAccessor.forceUpsert(logRecord.getNewValue());
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " 
+ logRecord.getNewOp());
             }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index cc12f36..47e212e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -161,18 +161,18 @@
         return new org.apache.asterix.common.transactions.JobId((int) 
jobId.getId());
     }
 
-    public LSMInsertDeleteOperatorNodePushable 
getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
-            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType 
metaType,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields,
-            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider) throws 
AlgebricksException, HyracksDataException {
+    public IFrameWriter[] getInsertPipeline(IHyracksTaskContext ctx, Dataset 
dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields, 
int[] primaryKeyIndexes,
+            List<Integer> primaryKeyIndicators, StorageComponentProvider 
storageComponentProvider)
+            throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, 
primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
                 new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), 
dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+                        primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE);
         LSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
                 getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
         IIndexDataflowHelperFactory dataflowHelperFactory =
@@ -185,7 +185,8 @@
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, 
true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
-        return insertOp;
+        IFrameWriter[] pipeline = { insertOp, commitOp };
+        return pipeline;
     }
 
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, 
IHyracksTaskContext ctx, Dataset dataset,
@@ -300,8 +301,7 @@
         Index index = primaryIndexInfo.getIndex();
         MetadataProvider mdProvider = new MetadataProvider(dataverse, 
storageComponentProvider);
         return dataset.getIndexDataflowHelperFactory(mdProvider, index, 
primaryIndexInfo.recordType,
-                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
-                primaryIndexInfo.mergePolicyProperties);
+                primaryIndexInfo.metaType, 
primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
     }
 
     public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, 
IAType[] primaryKeyTypes,
@@ -434,11 +434,10 @@
         private Index index;
         private IStorageComponentProvider storageComponentProvider;
 
-        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, 
ARecordType recordType,
-                ARecordType metaType, ILSMMergePolicyFactory 
mergePolicyFactory,
-                Map<String, String> mergePolicyProperties, int[] filterFields, 
int[] primaryKeyIndexes,
-                List<Integer> primaryKeyIndicators, IStorageComponentProvider 
storageComponentProvider)
-                throws AlgebricksException {
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, 
ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties,
+                int[] filterFields, int[] primaryKeyIndexes, List<Integer> 
primaryKeyIndicators,
+                IStorageComponentProvider storageComponentProvider) throws 
AlgebricksException {
             this.storageComponentProvider = storageComponentProvider;
             this.dataset = dataset;
             this.primaryKeyTypes = primaryKeyTypes;
@@ -477,10 +476,10 @@
             index = new Index(dataset.getDataverseName(), 
dataset.getDatasetName(), dataset.getDatasetName(),
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, 
keyFieldTypes, false, true,
                     MetadataUtil.PENDING_NO_OP);
-            localResourceFactoryProvider = 
getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider,
-                    index, dataset, primaryIndexTypeTraits, 
primaryIndexComparatorFactories,
-                    primaryIndexBloomFilterKeyFields, mergePolicyFactory, 
mergePolicyProperties, filterTypeTraits,
-                    filterCmpFactories, btreeFields, filterFields, 
dataset.getIndexOperationTrackerFactory(index));
+            localResourceFactoryProvider = 
getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index,
+                    dataset, primaryIndexTypeTraits, 
primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields,
+                    mergePolicyFactory, mergePolicyProperties, 
filterTypeTraits, filterCmpFactories, btreeFields,
+                    filterFields, 
dataset.getIndexOperationTrackerFactory(index));
         }
 
         public Index getIndex() {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1467dbf..435caf7 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -115,9 +115,10 @@
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
                 ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-                LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                LSMInsertDeleteOperatorNodePushable insertOp =
+                        (LSMInsertDeleteOperatorNodePushable) 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), 
null, null, KEY_INDEXES,
-                        KEY_INDICATORS_LIST, storageManager);
+                        KEY_INDICATORS_LIST, storageManager)[0];
                 insertOp.open();
                 TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 10e8658..0dec460 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -123,9 +123,10 @@
                 nc.newJobId();
                 ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                LSMInsertDeleteOperatorNodePushable insertOp =
+                        (LSMInsertDeleteOperatorNodePushable) 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), 
null, null, KEY_INDEXES, KEY_INDICATOR_LIST,
-                        storageManager);
+                                storageManager)[0];
                 insertOp.open();
                 TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d691b18..4822c01 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -133,7 +133,6 @@
         buffer.putInt(jobId);
         switch (logType) {
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 writeEntityInfo(buffer);
                 break;
             case LogType.UPDATE:
@@ -268,7 +267,6 @@
                 computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 if (readEntityInfo(buffer)) {
                     computeAndSetLogSize();
                 } else {
@@ -428,7 +426,6 @@
                 logSize = JOB_TERMINATE_LOG_SIZE;
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
                 break;
             case LogType.FLUSH:
@@ -457,7 +454,7 @@
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType == LogType.ENTITY_COMMIT || logType == 
LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" ResourcePartition : ").append(resourcePartition);
             builder.append(" PKHashValue : ").append(PKHashValue);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 269e4b9..11c45ad 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -25,7 +25,6 @@
     public static final byte ENTITY_COMMIT = 2;
     public static final byte ABORT = 3;
     public static final byte FLUSH = 4;
-    public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
     public static final byte MARKER = 7;
 
@@ -34,7 +33,6 @@
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
     private static final String STRING_ABORT = "ABORT";
     private static final String STRING_FLUSH = "FLUSH";
-    private static final String STRING_UPSERT_ENTITY_COMMIT = 
"UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
     private static final String STRING_MARKER = "MARKER";
     private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
@@ -51,8 +49,6 @@
                 return STRING_ABORT;
             case LogType.FLUSH:
                 return STRING_FLUSH;
-            case LogType.UPSERT_ENTITY_COMMIT:
-                return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
             case LogType.MARKER:
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 51790e6..01b4db2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -444,7 +444,7 @@
         // locks and secondary index doesn't.
         return new 
SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
                 metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, 
ResourceType.LSM_BTREE, indexOp, false);
+                transactionSubsystem, resourceId, metadataStoragePartition, 
ResourceType.LSM_BTREE, indexOp);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..9645ccf 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1118,7 +1118,7 @@
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, 
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
                     : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, 
dataset.hasMetaPart());
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE);
 
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
@@ -1319,8 +1319,7 @@
                             primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE)
                     : new PrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1517,7 +1516,7 @@
                             ResourceType.LSM_BTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1675,7 +1674,7 @@
                             ResourceType.LSM_RTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
+                            ResourceType.LSM_RTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1885,7 +1884,7 @@
                             ResourceType.LSM_INVERTED_INDEX)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX, 
dataset.hasMetaPart());
+                            ResourceType.LSM_INVERTED_INDEX);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..8ab9d10 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,7 +29,6 @@
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -37,6 +36,7 @@
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.JobUtils;
@@ -520,18 +520,16 @@
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT
                     ? new UpsertOperationCallbackFactory(jobId, 
getDatasetId(), primaryKeyFields,
-                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType())
                     : op == IndexOperation.DELETE || op == 
IndexOperation.INSERT
                             ? new 
PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
                                     primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), op,
-                                    index.resourceType(), hasMetaPart())
+                                    index.resourceType())
                             : NoOpOperationCallbackFactory.INSTANCE;
         } else {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
                     ? new 
SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
-                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
@@ -572,9 +570,9 @@
     }
 
     public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] 
primaryKeyFields,
-            MetadataProvider metadataProvider, int upsertVarIdx, int[] 
datasetPartitions, boolean isSink) {
+            MetadataProvider metadataProvider, int[] datasetPartitions, 
boolean isSink) {
         return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), 
metadataProvider.isWriteTransaction(), upsertVarIdx,
+                metadataProvider.isTemporaryDatasetWriteJob(), 
metadataProvider.isWriteTransaction(),
                 datasetPartitions, isSink);
     }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 044707a..85e2bef 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -462,7 +462,6 @@
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         //if the log partition belongs to a partitions hosted 
on this node, replicate it
                         if 
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
                             logManager.log(remoteLog);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..d07bab3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -49,7 +49,6 @@
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
-import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -84,8 +83,8 @@
     private final int metaFieldIndex;
     private LockThenSearchOperationCallback searchCallback;
 
-    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor 
opDesc, IHyracksTaskContext ctx,
-            int partition, int[] fieldPermutation, IRecordDescriptorProvider 
recordDescProvider, int numOfPrimaryKeys,
+    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor 
opDesc, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, IRecordDescriptorProvider 
recordDescProvider, int numOfPrimaryKeys,
             ARecordType recordType, int filterFieldIndex) throws 
HyracksDataException {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, 
IndexOperation.UPSERT);
         this.key = new PermutingFrameTupleReference();
@@ -140,15 +139,15 @@
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            modCallback = 
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResource(), ctx, this);
+            modCallback = opDesc.getModificationOpCallbackFactory()
+                    
.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
             searchCallback = (LockThenSearchOperationCallback) 
opDesc.getSearchOpCallbackFactory()
                     
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
             indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) 
ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAppRuntimeContext runtimeCtx =
+                    (IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject();
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
@@ -221,17 +220,19 @@
                     }
                     // if with filters, append the filter
                     if (isFiltered) {
-                        dos.write(prevTuple.getFieldData(filterFieldIndex),
-                                prevTuple.getFieldStart(filterFieldIndex),
+                        dos.write(prevTuple.getFieldData(filterFieldIndex), 
prevTuple.getFieldStart(filterFieldIndex),
                                 prevTuple.getFieldLength(filterFieldIndex));
                         tb.addFieldEndOffset();
                     }
-                    modCallback.setOp(Operation.DELETE);
-                    if (firstModification) {
-                        lsmAccessor.delete(prevTuple);
-                        firstModification = false;
-                    } else {
-                        lsmAccessor.forceDelete(prevTuple);
+                    if (isNull(tuple, numOfPrimaryKeys)) {
+                        // Only delete if it is a delete and not upsert
+                        modCallback.setOp(IndexOperation.DELETE_BYTE);
+                        if (firstModification) {
+                            lsmAccessor.delete(prevTuple);
+                            firstModification = false;
+                        } else {
+                            lsmAccessor.forceDelete(prevTuple);
+                        }
                     }
                 } else {
                     prevTuple = null;
@@ -246,12 +247,12 @@
                     cursor.reset();
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
-                    modCallback.setOp(Operation.INSERT);
+                    modCallback.setOp(IndexOperation.UPSERT_BYTE);
                     if (firstModification) {
-                        lsmAccessor.insert(tuple);
+                        lsmAccessor.upsert(tuple);
                         firstModification = false;
                     } else {
-                        lsmAccessor.forceInsert(tuple);
+                        lsmAccessor.forceUpsert(tuple);
                     }
                     recordWasInserted = true;
                 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index eab9cc7..4dd1eca 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -24,7 +24,6 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import 
org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -109,12 +108,12 @@
                 }
                 if (!isPrevValueNull) {
                     // previous is not null, we need to delete previous
-                    modCallback.setOp(Operation.DELETE);
+                    modCallback.setOp(IndexOperation.DELETE_BYTE);
                     lsmAccessor.forceDelete(prevValueTuple);
                 }
                 if (!isNewNull) {
                     // new is not null, we need to insert the new value
-                    modCallback.setOp(Operation.INSERT);
+                    modCallback.setOp(IndexOperation.INSERT_BYTE);
                     lsmAccessor.forceInsert(tuple);
                 }
 
diff --git a/asterixdb/asterix-transactions/pom.xml 
b/asterixdb/asterix-transactions/pom.xml
index a65a436..2ff0ccd 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -106,11 +106,6 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-om</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 2a3467e..3893854 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -28,14 +28,10 @@
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
 public abstract class AbstractIndexModificationOperationCallback extends 
AbstractOperationCallback {
-
-    private static final byte INSERT_OP = (byte) 
IndexOperation.INSERT.ordinal();
-    private static final byte DELETE_OP = (byte) 
IndexOperation.DELETE.ordinal();
     protected final long resourceId;
     protected final byte resourceType;
     protected final IndexOperation indexOp;
@@ -81,14 +77,7 @@
         txnSubsystem.getLogManager().log(logRecord);
     }
 
-    public void setOp(Operation op) throws HyracksDataException {
-        switch (op) {
-            case DELETE:
-                logRecord.setNewOp(DELETE_OP);
-                break;
-            case INSERT:
-                logRecord.setNewOp(INSERT_OP);
-                break;
-        }
+    public void setOp(byte op) throws HyracksDataException {
+        logRecord.setNewOp(op);
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index c627367..0747a8d 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -40,16 +40,13 @@
         implements IModificationOperationCallback {
 
     private final LSMInsertDeleteOperatorNodePushable operatorNodePushable;
-    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] 
primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long 
resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, IOperatorNodePushable 
operatorNodePushable,
-            boolean logBeforeImage) {
+            byte resourceType, IndexOperation indexOp, IOperatorNodePushable 
operatorNodePushable) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
         this.operatorNodePushable = (LSMInsertDeleteOperatorNodePushable) 
operatorNodePushable;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -102,11 +99,7 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            if (logBeforeImage) {
-                log(pkHash, after, before);
-            } else {
-                log(pkHash, after, null);
-            }
+            log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 4cbb8cf..fcd4cd5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -46,14 +46,11 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -72,7 +69,7 @@
             Resource aResource = (Resource) resource.getResource();
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp, 
operatorNodePushable, logBeforeImage);
+                    aResource.partition(), resourceType, indexOp, 
operatorNodePushable);
             txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 974e631..845e9d7 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -37,15 +37,13 @@
         implements IModificationOperationCallback {
 
     protected final IndexOperation oldOp;
-    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallback(int datasetId, int[] 
primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, 
ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp, 
boolean logBeforeImage) {
+            int resourcePartition, byte resourceType, IndexOperation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
         oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : 
IndexOperation.DELETE;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -57,7 +55,7 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            this.log(pkHash, after, logBeforeImage ? before : null);
+            this.log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3925bba..cce3710 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -42,14 +42,11 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -68,7 +65,7 @@
             Resource aResource = (Resource) resource.getResource();
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp, 
logBeforeImage);
+                    aResource.partition(), resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index 13d2d57..413ef65 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -29,14 +29,12 @@
 
 public class UpsertOperationCallback extends 
AbstractIndexModificationOperationCallback
         implements IModificationOperationCallback {
-    private final boolean logBeforeImage;
 
     public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, 
ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long 
resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, boolean logBeforeImage) 
{
+            byte resourceType, IndexOperation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
                 resourceType, indexOp);
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -48,7 +46,7 @@
     public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after, logBeforeImage ? before : null);
+            log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 9349e93..cd702ed 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -41,14 +41,11 @@
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
 
     public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] 
primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation 
indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -67,7 +64,7 @@
             ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new 
UpsertOperationCallback(datasetId, primaryKeyFields,
                     txnCtx, txnSubsystem.getLockManager(),
-                    txnSubsystem, resource.getId(), aResource.partition(), 
resourceType, indexOp, logBeforeImage);
+                    txnSubsystem, resource.getId(), aResource.partition(), 
resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..2752461 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -34,18 +34,16 @@
     private final int[] primaryKeyFields;
     private final boolean isTemporaryDatasetWriteJob;
     private final boolean isWriteTransaction;
-    private final int upsertVarIdx;
     private int[] datasetPartitions;
     private final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] 
primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] 
datasetPartitions, boolean isSink) {
+            boolean isWriteTransaction, int[] datasetPartitions, boolean 
isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
-        this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
         this.isSink = isSink;
     }
@@ -57,13 +55,7 @@
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        if (upsertVarIdx >= 0) {
-            return new UpsertCommitRuntime(ctx, jobId, datasetId, 
primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx, isSink);
-        } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, 
isTemporaryDatasetWriteJob,
                     isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
-        }
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
deleted file mode 100644
index 9b2fe36..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.runtime;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.TransactionUtil;
-import 
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class UpsertCommitRuntime extends CommitRuntime {
-    private final int upsertIdx;
-
-    public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int 
datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition, int upsertIdx,
-            boolean isSink) {
-        super(ctx, jobId, datasetId, primaryKeyFields, 
isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition, isSink);
-        this.upsertIdx = upsertIdx;
-    }
-
-    @Override
-    protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = 
ABooleanSerializerDeserializer.getBoolean(buffer.array(), 
tAccess.getFieldSlotsLength()
-                + tAccess.getTupleStartOffset(t) + 
tAccess.getFieldStartOffset(t, upsertIdx) + 1);
-        if (isNull) {
-            // Previous record not found (insert)
-            super.formLogRecord(buffer, t);
-        } else {
-            // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash, tRef,
-                    primaryKeyFields, resourcePartition, 
LogType.UPSERT_ENTITY_COMMIT);
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 424e800..15918d1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -255,18 +255,13 @@
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogSource() == LogSource.LOCAL) {
-                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT
-                            || logRecord.getLogType() == 
LogType.UPSERT_ENTITY_COMMIT) {
+                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                         reusableJobId.setId(logRecord.getJobId());
                         txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, 
false);
                         reusableDsId.setId(logRecord.getDatasetId());
                         txnSubsystem.getLockManager().unlock(reusableDsId, 
logRecord.getPKHashValue(), LockMode.ANY,
                                 txnCtx);
                         txnCtx.notifyOptracker(false);
-                        if (logRecord.getLogType() == 
LogType.UPSERT_ENTITY_COMMIT) {
-                            // since this operation consisted of delete and 
insert, we need to notify the optracker twice
-                            txnCtx.notifyOptracker(false);
-                        }
                         if (TransactionUtil.PROFILE_MODE) {
                             txnSubsystem.incrementEntityCommitCount();
                         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index d885f00..e3ad3dd 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -48,7 +48,6 @@
         if (shouldReplicate) {
             switch (logRecord.getLogType()) {
                 case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
                 case LogType.UPDATE:
                 case LogType.FLUSH:
                     shouldReplicate = 
replicationStrategy.isMatch(logRecord.getDatasetId());
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
index 89d043c..70c5557 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -29,4 +29,8 @@
         this.inputRecordDesc = recordDescriptor;
     }
 
+    public RecordDescriptor getInputRecordDescriptor() {
+        return inputRecordDesc;
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index f22c239..1684923 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -29,10 +29,6 @@
  * @author zheilbron
  */
 public interface IModificationOperationCallback {
-    public enum Operation {
-        INSERT,
-        DELETE
-    }
 
     /**
      * This method is called on a tuple that is about to traverse an index's 
structure
@@ -60,5 +56,5 @@
      * @param op
      * @throws HyracksDataException
      */
-    public void setOp(Operation op) throws HyracksDataException;
+    public void setOp(byte op) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index e8ab8dc..98173bd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public void setOp(Operation op) throws HyracksDataException {
+    public void setOp(byte op) throws HyracksDataException {
         // Do nothing.
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 43e0889..b416f5a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -32,5 +32,8 @@
     MERGE,
     FULL_MERGE,
     FLUSH,
-    REPLICATE
+    REPLICATE;
+    public static final byte INSERT_BYTE = (byte) INSERT.ordinal();
+    public static final byte DELETE_BYTE = (byte) DELETE.ordinal();
+    public static final byte UPSERT_BYTE = (byte) UPSERT.ordinal();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 53b8405..473e28d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -57,10 +57,10 @@
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -286,10 +286,10 @@
             case PHYSICALDELETE:
             case FLUSH:
             case DELETE:
+            case UPSERT:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case INSERT:
-            case UPSERT:
                 addOperationalMutableComponents(operationalComponents);
                 operationalComponents.addAll(immutableComponents);
                 break;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fecc674..3e3b0d4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -117,6 +117,8 @@
 
     void forceInsert(ITupleReference tuple) throws HyracksDataException, 
IndexException;
 
+    void forceUpsert(ITupleReference tuple) throws HyracksDataException, 
IndexException;
+
     void forceDelete(ITupleReference tuple) throws HyracksDataException, 
IndexException;
 
     void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean 
bulkload, LSMOperationType opType)
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 4199cfb..af62cbf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -160,6 +160,12 @@
     }
 
     @Override
+    public void forceUpsert(ITupleReference tuple) throws 
HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.UPSERT);
+        lsmHarness.forceModify(ctx, tuple);
+    }
+
+    @Override
     public void forceDelete(ITupleReference tuple) throws 
HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.DELETE);
         lsmHarness.forceModify(ctx, tuple);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index cef4257..ff09f5b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -185,4 +185,9 @@
         throw new UnsupportedOperationException("Cannot open inverted list 
cursor on lsm inverted index.");
     }
 
+    @Override
+    public void forceUpsert(ITupleReference tuple) throws 
HyracksDataException, IndexException {
+        throw new UnsupportedOperationException("Upsert not supported by lsm 
inverted index.");
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index 6da9334..1dc3a73 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -100,7 +100,7 @@
         }
 
         @Override
-        public void setOp(Operation op) throws HyracksDataException {
+        public void setOp(byte op) throws HyracksDataException {
         }
 
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index 15d8a60..089595f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public void setOp(Operation op) throws HyracksDataException {
+    public void setOp(byte op) throws HyracksDataException {
         // Do nothing.
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1554
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to