abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1613

Change subject: Fix creation of callback factories
......................................................................

Fix creation of callback factories

Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
8 files changed, 89 insertions(+), 103 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/13/1613/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 12f28c5..ac3caf3 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.active;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +31,7 @@
 public abstract class ActiveSourceOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable
         implements IActiveRuntime {
 
+    private final Logger LOGGER = 
Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
@@ -79,6 +83,7 @@
 
     @Override
     public final void initialize() throws HyracksDataException {
+        LOGGER.log(Level.INFO, "initialize() called on 
ActiveSourceOperatorNodePushable");
         activeManager.registerRuntime(this);
         try {
             // notify cc that runtime has been registered
@@ -86,15 +91,18 @@
                     ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
             start();
         } catch (InterruptedException e) {
+            LOGGER.log(Level.INFO, "initialize() interrupted on 
ActiveSourceOperatorNodePushable", e);
             Thread.currentThread().interrupt();
             throw new HyracksDataException(e);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "initialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw new HyracksDataException(e);
         } finally {
             synchronized (this) {
                 done = true;
                 notifyAll();
             }
+            LOGGER.log(Level.INFO, "initialize() returning on 
ActiveSourceOperatorNodePushable");
         }
     }
 
@@ -105,10 +113,12 @@
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
                     ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "deinitialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw new HyracksDataException(e);
+        } finally {
+            LOGGER.log(Level.INFO, "deinitialize() returning on 
ActiveSourceOperatorNodePushable");
         }
     }
-
 
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 2ff0617..e99cba5 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -50,11 +51,13 @@
             IBinaryComparatorFactory[] invListComparatorFactories, 
IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, 
IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory,
-            IModificationOperationCallbackFactory 
modificationOpCallbackFactory, String indexName,
+            IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, String 
indexName,
             IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 fieldPermutation, op, dataflowHelperFactory, 
tupleFilterFactory, modificationOpCallbackFactory,
+                searchCallbackFactory,
                 pageManagerFactory);
         this.indexName = indexName;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 39ea54d..175fe2b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -31,14 +31,12 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.IApplicationContextInfo;
 import 
org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -89,15 +87,6 @@
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
-import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +135,6 @@
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -157,7 +145,6 @@
 public class MetadataProvider implements IMetadataProvider<DataSourceId, 
String> {
 
     private final IStorageComponentProvider storaegComponentProvider;
-    private final ITransactionSubsystemProvider txnSubsystemProvider;
     private final IMetadataPageManagerFactory metadataPageManagerFactory;
     private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
     private final StorageProperties storageProperties;
@@ -182,7 +169,6 @@
         this.storaegComponentProvider = componentProvider;
         storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
         libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
-        txnSubsystemProvider = 
componentProvider.getTransactionSubsystemProvider();
         metadataPageManagerFactory = 
componentProvider.getMetadataPageManagerFactory();
         primitiveValueProviderFactory = 
componentProvider.getPrimitiveValueProviderFactory();
     }
@@ -521,27 +507,13 @@
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
-
-            ISearchOperationCallbackFactory searchCallbackFactory;
-            if (isSecondary) {
-                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                        : new SecondaryIndexSearchOperationCallbackFactory();
-            } else {
-                int datasetId = dataset.getDatasetId();
-                int[] primaryKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    primaryKeyFields[i] = i;
-                }
-
-                /**
-                 * Due to the read-committed isolation level,
-                 * we may acquire very short duration lock(i.e., instant lock) 
for readers.
-                 */
-                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                        : new 
PrimaryIndexInstantSearchOperationCallbackFactory(
-                                ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId(),
-                                datasetId, primaryKeyFields, 
txnSubsystemProvider, ResourceType.LSM_BTREE);
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
             }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, theIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             RuntimeComponentsProvider rtcProvider = 
RuntimeComponentsProvider.RUNTIME_PROVIDER;
@@ -646,8 +618,13 @@
             }
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
                     secondaryIndex, recType, metaType, compactionInfo.first, 
compactionInfo.second);
@@ -1011,8 +988,8 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = 
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName, false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
+            ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, 
fileIndex, jobId, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, retainInput, 
appContext.getIndexLifecycleManagerProvider(),
@@ -1091,14 +1068,11 @@
                 primaryKeyFields[i] = i;
             }
 
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.UPSERT, ResourceType.LSM_BTREE)
-                    : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
-                            Operation.UPSERT, ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
 
-            LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
-                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1281,18 +1255,14 @@
                     getSplitProviderAndConstraints(dataset);
 
             // prepare callback
-            int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.get(indexOp), ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.get(indexOp), ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, indexOp, 
primaryKeyFields);
+            ISearchOperationCallbackFactory searcgCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, 
primaryIndex, jobId, indexOp, primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1310,7 +1280,7 @@
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, null, true,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        indexName, null, modificationCallbackFactory, 
searcgCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1483,15 +1453,10 @@
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
@@ -1508,13 +1473,13 @@
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory, prevFieldPermutation,
+                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1641,15 +1606,10 @@
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
@@ -1666,13 +1626,13 @@
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, 
indexDataflowHelperFactory, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, 
searchCallbackFactory, prevFieldPermutation,
+                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, indexOp, 
indexDataflowHelperFactory, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, 
searchCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1852,13 +1812,10 @@
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
@@ -1875,14 +1832,16 @@
                 op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         splitsAndConstraint.first, 
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
                         tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
-                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
+                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory,
+                        searchCallbackFactory, indexName,
                         prevFieldPermutation, metadataPageManagerFactory);
             } else {
                 op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, 
recordDesc,
                         appContext.getStorageManager(), 
splitsAndConstraint.first,
                         appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
                         invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
+                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, searchCallbackFactory,
+                        indexName,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index cd8cf3b..ff54642 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -68,6 +68,7 @@
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
@@ -519,9 +520,13 @@
             int[] primaryKeyFields) throws AlgebricksException {
         if (getDatasetDetails().isTemp()) {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, 
getDatasetId(),
-                            primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType())
+                    ? index.isPrimaryIndex()
+                            ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
+                            : new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, 
getDatasetId(),
+                                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
index f1547a8..9d0b917 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -50,11 +51,11 @@
             IBinaryComparatorFactory[] invListComparatorFactories, 
IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory, 
IModificationOperationCallbackFactory modificationOpCallbackFactory,
-            String indexName, int[] prevFieldPermutation, IPageManagerFactory 
pageManagerFactory) {
+            ISearchOperationCallbackFactory searchCallbackFactory, String 
indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 fieldPermutation, IndexOperation.UPSERT, 
dataflowHelperFactory, tupleFilterFactory,
-                modificationOpCallbackFactory, indexName, pageManagerFactory);
+                modificationOpCallbackFactory, searchCallbackFactory, 
indexName, pageManagerFactory);
         this.prevFieldPermutation = prevFieldPermutation;
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index b37ecae..e6cfd2a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -54,13 +54,13 @@
             IBinaryComparatorFactory[] comparatorFactories, int[] 
bloomFilterKeyFields, int[] fieldPermutation,
             IIndexDataflowHelperFactory dataflowHelperFactory, 
ITupleFilterFactory tupleFilterFactory,
             boolean isPrimary, String indexName, IMissingWriterFactory 
missingWriterFactory,
-            IModificationOperationCallbackFactory 
modificationOpCallbackProvider,
+            IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackProvider, int[] 
prevValuePermutation,
             IPageManagerFactory pageManagerFactory, 
IFrameOperationCallbackFactory frameOpCallbackFactory) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, 
fileSplitProvider, typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, 
IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, 
indexName, missingWriterFactory,
-                modificationOpCallbackProvider, searchOpCallbackProvider, 
pageManagerFactory);
+                modificationOpCallbackFactory, searchOpCallbackProvider, 
pageManagerFactory);
         this.prevValuePermutation = prevValuePermutation;
         this.frameOpCallbackFactory = frameOpCallbackFactory;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index e9c0e5f..da3e986 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -18,20 +18,27 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 
-public enum SynchronousScheduler implements ILSMIOOperationScheduler {
-    INSTANCE;
+public class SynchronousScheduler implements ILSMIOOperationScheduler {
+    private static final Logger LOGGER = 
Logger.getLogger(SynchronousScheduler.class.getName());
+    public static final SynchronousScheduler INSTANCE = new 
SynchronousScheduler();
+
+    private SynchronousScheduler() {
+    }
 
     @Override
     public void scheduleOperation(ILSMIOOperation operation) throws 
HyracksDataException {
         try {
             operation.call();
-        } catch (IndexException e) {
-            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "IO Operation failed", e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 46201d5..a342370 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -31,9 +31,9 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -55,11 +55,12 @@
             int[] fieldPermutation, IndexOperation op, 
IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory,
             IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory,
             IPageManagerFactory pageManagerFactory) {
         super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 dataflowHelperFactory, tupleFilterFactory, false, false,
-                null, NoOpLocalResourceFactoryProvider.INSTANCE, 
NoOpOperationCallbackFactory.INSTANCE,
+                null, NoOpLocalResourceFactoryProvider.INSTANCE, 
searchCallbackFactory,
                 modificationOpCallbackFactory,
                 pageManagerFactory);
         this.fieldPermutation = fieldPermutation;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1613
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to