http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
new file mode 100644
index 0000000..bfc6a8e
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.metadata.api.IResourceFactoryProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeLocalResourceFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyLocalResourceFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
+
+    public static final BTreeResourceFactoryProvider INSTANCE = new 
BTreeResourceFactoryProvider();
+
+    private BTreeResourceFactoryProvider() {
+    }
+
+    @Override
+    public IResourceFactory getResourceFactory(MetadataProvider mdProvider, 
Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] 
filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories) throws 
AlgebricksException {
+        int[] filterFields = IndexUtil.getFilterFields(dataset, index, 
filterTypeTraits);
+        int[] btreeFields = IndexUtil.getBtreeFieldsIfFiltered(dataset, index);
+        IStorageComponentProvider storageComponentProvider = 
mdProvider.getStorageComponentProvider();
+        ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, 
recordType, metaType);
+        IBinaryComparatorFactory[] cmpFactories = getCmpFactories(mdProvider, 
dataset, index, recordType, metaType);
+        int[] bloomFilterFields = getBloomFilterFields(dataset, index);
+        boolean durable = !dataset.isTemp();
+        double bloomFilterFalsePositiveRate = 
mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
+        ILSMOperationTrackerFactory opTrackerFactory = 
dataset.getIndexOperationTrackerFactory(index);
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory = 
dataset.getIoOperationCallbackFactory(index);
+        IStorageManager storageManager = 
storageComponentProvider.getStorageManager();
+        IMetadataPageManagerFactory metadataPageManagerFactory =
+                storageComponentProvider.getMetadataPageManagerFactory();
+        ILSMIOOperationSchedulerProvider ioSchedulerProvider =
+                storageComponentProvider.getIoOperationSchedulerProvider();
+        switch (dataset.getDatasetType()) {
+            case EXTERNAL:
+                return 
index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))
+                        ? new 
ExternalBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories,
+                                filterTypeTraits, filterCmpFactories, 
filterFields, opTrackerFactory,
+                                ioOpCallbackFactory, 
metadataPageManagerFactory, ioSchedulerProvider,
+                                mergePolicyFactory, mergePolicyProperties, 
durable, bloomFilterFields,
+                                bloomFilterFalsePositiveRate, false, 
btreeFields)
+                        : new 
ExternalBTreeWithBuddyLocalResourceFactory(storageManager, typeTraits, 
cmpFactories,
+                                filterTypeTraits, filterCmpFactories, 
filterFields, opTrackerFactory,
+                                ioOpCallbackFactory, 
metadataPageManagerFactory, ioSchedulerProvider,
+                                mergePolicyFactory, mergePolicyProperties, 
durable, bloomFilterFields,
+                                bloomFilterFalsePositiveRate, false, 
btreeFields);
+            case INTERNAL:
+                AsterixVirtualBufferCacheProvider vbcProvider =
+                        new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+                return new LSMBTreeLocalResourceFactory(storageManager, 
typeTraits, cmpFactories, filterTypeTraits,
+                        filterCmpFactories, filterFields, opTrackerFactory, 
ioOpCallbackFactory,
+                        metadataPageManagerFactory, vbcProvider, 
ioSchedulerProvider, mergePolicyFactory,
+                        mergePolicyProperties, durable, bloomFilterFields, 
bloomFilterFalsePositiveRate,
+                        index.isPrimaryIndex(), btreeFields);
+            default:
+                throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
+                        dataset.getDatasetType().toString());
+        }
+    }
+
+    private static ITypeTraits[] getTypeTraits(MetadataProvider 
metadataProvider, Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        ITypeTraits[] primaryTypeTraits = 
dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType);
+        if (index.isPrimaryIndex()) {
+            return primaryTypeTraits;
+        } else if (dataset.getDatasetType() == DatasetType.EXTERNAL
+                && 
index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
 {
+            return FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS;
+        }
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        ITypeTraitProvider typeTraitProvider = 
metadataProvider.getStorageComponentProvider().getTypeTraitProvider();
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + 
numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            ARecordType sourceType;
+            List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+            if (keySourceIndicators == null || keySourceIndicators.get(i) == 
0) {
+                sourceType = recordType;
+            } else {
+                sourceType = metaType;
+            }
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+                    index.getKeyFieldNames().get(i), sourceType);
+            IAType keyType = keyTypePair.first;
+            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i];
+        }
+        return secondaryTypeTraits;
+    }
+
+    private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider 
metadataProvider, Dataset dataset,
+            Index index, ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        IBinaryComparatorFactory[] primaryCmpFactories =
+                dataset.getPrimaryComparatorFactories(metadataProvider, 
recordType, metaType);
+        if (index.isPrimaryIndex()) {
+            return dataset.getPrimaryComparatorFactories(metadataProvider, 
recordType, metaType);
+        } else if (dataset.getDatasetType() == DatasetType.EXTERNAL
+                && 
index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
 {
+            return FilesIndexDescription.FILES_INDEX_COMP_FACTORIES;
+        }
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IBinaryComparatorFactoryProvider cmpFactoryProvider =
+                
metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider();
+        IBinaryComparatorFactory[] secondaryCmpFactories =
+                new IBinaryComparatorFactory[numSecondaryKeys + 
numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            ARecordType sourceType;
+            List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+            if (keySourceIndicators == null || keySourceIndicators.get(i) == 
0) {
+                sourceType = recordType;
+            } else {
+                sourceType = metaType;
+            }
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+                    index.getKeyFieldNames().get(i), sourceType);
+            IAType keyType = keyTypePair.first;
+            secondaryCmpFactories[i] = 
cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryCmpFactories[numSecondaryKeys + i] = 
primaryCmpFactories[i];
+        }
+        return secondaryCmpFactories;
+    }
+
+    private static int[] getBloomFilterFields(Dataset dataset, Index index) 
throws AlgebricksException {
+        if (index.isPrimaryIndex()) {
+            return dataset.getPrimaryBloomFilterFields();
+        } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            if 
(index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
 {
+                return FilesIndexDescription.BLOOM_FILTER_FIELDS;
+            } else {
+                return new int[] { index.getKeyFieldNames().size() };
+            }
+        }
+        int numKeys = index.getKeyFieldNames().size();
+        int[] bloomFilterKeyFields = new int[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            bloomFilterKeyFields[i] = i;
+        }
+        return bloomFilterKeyFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 9b2d4c4..b13f4c2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -27,7 +27,6 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
@@ -60,7 +59,7 @@ public class LoadableDataSource extends DataSource {
         this.targetDataset = targetDataset;
         this.adapter = adapter;
         this.adapterProperties = properties;
-        partitioningKeys = DatasetUtil.getPartitioningKeys(targetDataset);
+        partitioningKeys = targetDataset.getPrimaryKeys();
         ARecordType recType = (ARecordType) itemType;
         isPKAutoGenerated = ((InternalDatasetDetails) 
targetDataset.getDatasetDetails()).isAutogenerated();
         if (isPKAutoGenerated) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 774b73e..5934f5e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@ import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import 
org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -69,8 +68,8 @@ import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.lock.LockList;
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
@@ -84,10 +83,9 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import 
org.apache.asterix.runtime.operators.LSMInvertedIndexUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
+import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -124,7 +122,6 @@ import 
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -135,13 +132,13 @@ import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.common.IStorageManager;
 
 public class MetadataProvider implements IMetadataProvider<DataSourceId, 
String> {
 
@@ -450,7 +447,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             boolean lowKeyInclusive, boolean highKeyInclusive, int[] 
minFilterFieldIndexes, int[] maxFilterFieldIndexes)
             throws AlgebricksException {
         boolean isSecondary = true;
-        int numSecondaryKeys = 0;
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
@@ -459,62 +455,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             }
             Index theIndex = isSecondary ? 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName) : primaryIndex;
-            int numPrimaryKeys = 
DatasetUtil.getPartitioningKeys(dataset).size();
+            int numPrimaryKeys = dataset.getPrimaryKeys().size();
             RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            int[] bloomFilterKeyFields;
-            ITypeTraits[] typeTraits;
-            IBinaryComparatorFactory[] comparatorFactories;
-
-            ARecordType itemType =
-                    (ARecordType) 
this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-            ARecordType metaType = null;
-            List<Integer> primaryKeyIndicators = null;
-            if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-                primaryKeyIndicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
-            }
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
-            int[] filterFields;
-            int[] btreeFields;
-            if (isSecondary) {
-                numSecondaryKeys = theIndex.getKeyFieldNames().size();
-                bloomFilterKeyFields = new int[numSecondaryKeys];
-                for (int i = 0; i < numSecondaryKeys; i++) {
-                    bloomFilterKeyFields[i] = i;
-                }
-                Pair<IBinaryComparatorFactory[], ITypeTraits[]> 
comparatorFactoriesAndTypeTraits =
-                        
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(theIndex.getKeyFieldNames(),
-                                theIndex.getKeyFieldTypes(), 
DatasetUtil.getPartitioningKeys(dataset), itemType,
-                                dataset.getDatasetType(), 
dataset.hasMetaPart(), primaryKeyIndicators,
-                                theIndex.getKeyFieldSourceIndicators(), 
metaType);
-                comparatorFactories = comparatorFactoriesAndTypeTraits.first;
-                typeTraits = comparatorFactoriesAndTypeTraits.second;
-                if (filterTypeTraits != null) {
-                    filterFields = new int[1];
-                    filterFields[0] = numSecondaryKeys + numPrimaryKeys;
-                    btreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-                    for (int k = 0; k < btreeFields.length; k++) {
-                        btreeFields[k] = k;
-                    }
-                }
-
-            } else {
-                bloomFilterKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    bloomFilterKeyFields[i] = i;
-                }
-                // get meta item type
-                ARecordType metaItemType = DatasetUtil.getMetaType(this, 
dataset);
-                typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
-                comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, 
metaItemType,
-                        context.getBinaryComparatorFactoryProvider());
-            }
-
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                    getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
             int[] primaryKeyFields = new int[numPrimaryKeys];
             for (int i = 0; i < numPrimaryKeys; i++) {
                 primaryKeyFields[i] = i;
@@ -522,27 +466,20 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
             ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
                     storaegComponentProvider, theIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            RuntimeComponentsProvider rtcProvider = 
RuntimeComponentsProvider.RUNTIME_PROVIDER;
+            IStorageManager storageManager = 
getStorageComponentProvider().getStorageManager();
+            IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(storageManager, spPc.first);
             BTreeSearchOperatorDescriptor btreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
-                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
-                        typeTraits, comparatorFactories, bloomFilterKeyFields, 
lowKeyFields, highKeyFields,
-                        lowKeyInclusive, highKeyInclusive,
-                        dataset.getIndexDataflowHelperFactory(this, theIndex, 
itemType, metaType, compactionInfo.first,
-                                compactionInfo.second),
-                        retainInput, retainMissing, 
context.getMissingWriterFactory(), searchCallbackFactory,
-                        minFilterFieldIndexes, maxFilterFieldIndexes, 
metadataPageManagerFactory);
+                btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, lowKeyFields, highKeyFields,
+                        lowKeyInclusive, highKeyInclusive, indexHelperFactory, 
retainInput, retainMissing,
+                        context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
+                        maxFilterFieldIndexes, false);
             } else {
-                IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                        theIndex, itemType, metaType, compactionInfo.first, 
compactionInfo.second);
-                btreeSearchOp = new 
ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
-                        rtcProvider, spPc.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, lowKeyFields,
-                        highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory,
-                        metadataPageManagerFactory);
+                btreeSearchOp = new 
ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+                        highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput,
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
+                        maxFilterFieldIndexes,
+                        
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this));
             }
             return new Pair<>(btreeSearchOp, spPc.second);
         } catch (MetadataException me) {
@@ -555,75 +492,16 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
         try {
-            ARecordType recType = (ARecordType) 
findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-            int numPrimaryKeys = 
DatasetUtil.getPartitioningKeys(dataset).size();
+            int numPrimaryKeys = dataset.getPrimaryKeys().size();
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
                 throw new AlgebricksException(
                         "Code generation error: no index " + indexName + " for 
dataset " + dataset.getDatasetName());
             }
-            List<List<String>> secondaryKeyFields = 
secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            int numSecondaryKeys = secondaryKeyFields.size();
-            if (numSecondaryKeys != 1) {
-                throw new AlgebricksException(
-                        "Cannot use " + numSecondaryKeys + " fields as a key 
for the R-tree index. "
-                                + "There can be only one field as a key for 
the R-tree index.");
-            }
-            Pair<IAType, Boolean> keyTypePair =
-                    
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyFields.get(0), recType);
-            IAType keyType = keyTypePair.first;
-            if (keyType == null) {
-                throw new AlgebricksException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
-            }
-            int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-            int numNestedSecondaryKeyFields = numDimensions * 2;
-            IPrimitiveValueProviderFactory[] valueProviderFactories =
-                    new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-            for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-                valueProviderFactories[i] = primitiveValueProviderFactory;
-            }
-
             RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            // IS NOT THE VARIABLE BELOW ALWAYS = 0 ??
-            int keysStartIndex = outputRecDesc.getFieldCount() - 
numNestedSecondaryKeyFields - numPrimaryKeys;
-            if (retainInput) {
-                keysStartIndex -= numNestedSecondaryKeyFields;
-            }
-            IBinaryComparatorFactory[] comparatorFactories = 
JobGenHelper.variablesToAscBinaryComparatorFactories(
-                    outputVars, keysStartIndex, numNestedSecondaryKeyFields, 
typeEnv, context);
-            ITypeTraits[] typeTraits = 
JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
-                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, 
context);
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
-            ARecordType metaType = null;
-            if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-            }
-
-            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaType, 
context.getBinaryComparatorFactoryProvider());
-            int[] btreeFields = new int[primaryComparatorFactories.length];
-            for (int i = 0; i < btreeFields.length; i++) {
-                btreeFields[i] = i + numNestedSecondaryKeyFields;
-            }
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, recType);
-            int[] filterFields;
-            int[] rtreeFields;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys;
-                rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
-                for (int i = 0; i < rtreeFields.length; i++) {
-                    rtreeFields[i] = i;
-                }
-            }
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             int[] primaryKeyFields = new int[numPrimaryKeys];
             for (int i = 0; i < numPrimaryKeys; i++) {
                 primaryKeyFields[i] = i;
@@ -632,21 +510,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
                     storaegComponentProvider, secondaryIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
             RTreeSearchOperatorDescriptor rtreeSearchOp;
-            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    secondaryIndex, recType, metaType, compactionInfo.first, 
compactionInfo.second);
+            IIndexDataflowHelperFactory indexDataflowHelperFactory =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
spPc.first);
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
-                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
-                        typeTraits, comparatorFactories, keyFields, 
indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes, metadataPageManagerFactory);
+                rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, keyFields, true, true,
+                        indexDataflowHelperFactory, retainInput, 
retainMissing, context.getMissingWriterFactory(),
+                        searchCallbackFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes, false);
             } else {
                 // Create the operator
-                rtreeSearchOp = new 
ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
-                        typeTraits, comparatorFactories, keyFields, 
indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory,
-                        metadataPageManagerFactory);
+                rtreeSearchOp = new 
ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, 
true,
+                        indexDataflowHelperFactory, retainInput, 
retainMissing, context.getMissingWriterFactory(),
+                        searchCallbackFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes,
+                        
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this));
             }
 
             return new Pair<>(rtreeSearchOp, spPc.second);
@@ -696,19 +571,16 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             JobSpecification spec) throws AlgebricksException {
         String dataverseName = dataSource.getId().getDataverseName();
         String datasetName = dataSource.getId().getDatasourceName();
-
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         int numKeys = keys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
 
         // move key fields to front
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
-        int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
         for (LogicalVariable varKey : keys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
             i++;
         }
         fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
@@ -720,38 +592,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         try {
             boolean temp = dataset.getDatasetDetails().isTemp();
             isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            ARecordType metaType = dataset.hasMetaPart()
-                    ? (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
-                    : null;
-            String itemTypeName = dataset.getItemTypeName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
-            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaType, 
context.getBinaryComparatorFactoryProvider());
-
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset);
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
-
             // TODO
             // figure out the right behavior of the bulkload and then give the
             // right callback
             // (ex. what's the expected behavior when there is an error during
             // bulkload?)
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory indexHelperFactory =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
-                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, 
appContext.getStorageManager(),
-                            appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                            comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
-                            false, numElementsHint, true, 
dataset.getIndexDataflowHelperFactory(this, primaryIndex,
-                                    itemType, metaType, compactionInfo.first, 
compactionInfo.second),
-                            metadataPageManagerFactory);
+                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, 
fieldPermutation,
+                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, true, indexHelperFactory);
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -762,19 +615,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertRuntime(
             IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, 
List<LogicalVariable> additionalNonKeyFields,
-            List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor recordDesc, JobGenContext context,
+            List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor inputRecordDesc, JobGenContext context,
             JobSpecification spec, boolean bulkload) throws 
AlgebricksException {
         return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, 
propagatedSchema, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, bulkload, 
additionalNonFilteringFields);
+                additionalNonKeyFields, inputRecordDesc, context, spec, 
bulkload, additionalNonFilteringFields);
     }
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getDeleteRuntime(
             IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, 
List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, 
JobSpecification spec) throws AlgebricksException {
+            RecordDescriptor inputRecordDesc, JobGenContext context, 
JobSpecification spec) throws AlgebricksException {
         return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, 
propagatedSchema, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, false, 
null);
+                additionalNonKeyFields, inputRecordDesc, context, spec, false, 
null);
     }
 
     @Override
@@ -947,12 +800,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return ClusterStateManager.INSTANCE.getClusterLocations();
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, 
boolean create) throws AlgebricksException {
-        return 
SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(
-                findDataset(dataverseName, datasetName), mdTxnCtx, 
targetIdxName, create);
-    }
-
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(
             JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, 
boolean retainInput,
             IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, 
JobGenContext context,
@@ -962,45 +809,26 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ARecordType itemType =
                     (ARecordType) 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                             dataset.getDataverseName(), 
dataset.getItemTypeName()).getDatatype();
-            ARecordType metaType = null;
-            if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) MetadataManager.INSTANCE
-                                
.getDatatype(metadataProvider.getMetadataTxnContext(),
-                                        
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
-                                .getDatatype();
-            }
             ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
             LookupAdapterFactory<?> adapterFactory = 
AdapterFactoryProvider.getLookupAdapterFactory(
                     getApplicationContext().getServiceContext(), 
datasetDetails.getProperties(), itemType, ridIndexes,
                     retainInput, retainMissing, 
context.getMissingWriterFactory());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
-            try {
-                compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
-            } catch (MetadataException e) {
-                throw new AlgebricksException(" Unabel to create merge policy 
factory for external dataset", e);
-            }
-
-            String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+            String fileIndexName = 
IndexingConstants.getFilesIndexName(dataset.getDatasetName());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                    metadataProvider.getSplitProviderAndConstraints(dataset, 
fileIndexName);
             Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName);
             // Create the file index data flow helper
-            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    fileIndex, itemType, metaType, compactionInfo.first, 
compactionInfo.second);
+            IIndexDataflowHelperFactory indexDataflowHelperFactory =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
spPc.first);
             // Create the out record descriptor, appContext and 
fileSplitProvider for the files index
             RecordDescriptor outRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = 
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                    dataset.getDatasetName(), fileIndexName, false);
             ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
                     .getSearchCallbackFactory(storaegComponentProvider, 
fileIndex, jobId, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
-                    outRecDesc, indexDataflowHelperFactory, retainInput, 
appContext.getIndexLifecycleManagerProvider(),
-                    appContext.getStorageManager(), spPc.first, 
searchOpCallbackFactory, retainMissing,
-                    context.getMissingWriterFactory(), 
metadataPageManagerFactory);
+                    outRecDesc, indexDataflowHelperFactory, 
searchOpCallbackFactory,
+                    
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this));
             return new Pair<>(op, spPc.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
@@ -1021,7 +849,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
         boolean temp = dataset.getDatasetDetails().isTemp();
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
         int numKeys = primaryKeys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
         int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : 
additionalNonFilterFields.size();
@@ -1053,17 +880,11 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-
             String itemTypeName = dataset.getItemTypeName();
             String itemTypeDataverseName = dataset.getItemTypeDataverseName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, itemTypeDataverseName, 
itemTypeName).getDatatype();
             ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset);
             // prepare callback
@@ -1078,19 +899,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
-
             ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
                     storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                    metaItemType, compactionInfo.first, compactionInfo.second);
-            LSMTreeUpsertOperatorDescriptor op;
-
+            IIndexDataflowHelperFactory idfh =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+            LSMPrimaryUpsertOperatorDescriptor op;
             ITypeTraits[] outputTypeTraits =
                     new ITypeTraits[recordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            ISerializerDeserializer[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
+            ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
                     + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
 
             // add the previous record first
@@ -1124,15 +940,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
                 outputSerDes[j + f] = recordDesc.getFields()[j];
             }
-
             RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, 
appContext.getStorageManager(),
-                    appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                    comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, null, true, indexName,
-                    context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory, null,
-                    metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory(), hasSecondaries);
-            op.setType(itemType);
-            op.setFilterIndex(fieldIdx);
+            op = new LSMPrimaryUpsertOperatorDescriptor(spec, 
outputRecordDesc, fieldPermutation, idfh,
+                    context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory,
+                    dataset.getFrameOpCallbackFactory(), numKeys, itemType, 
fieldIdx, hasSecondaries);
             return new Pair<>(op, splitsAndConstraint.second);
 
         } catch (MetadataException me) {
@@ -1212,7 +1023,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertOrDeleteRuntime(IndexOperation indexOp,
             IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> 
additionalNonKeyFields, RecordDescriptor recordDesc,
+            LogicalVariable payload, List<LogicalVariable> 
additionalNonKeyFields, RecordDescriptor inputRecordDesc,
             JobGenContext context, JobSpecification spec, boolean bulkload,
             List<LogicalVariable> additionalNonFilteringFields) throws 
AlgebricksException {
 
@@ -1250,15 +1061,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName()).getDatatype();
-            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset);
 
@@ -1269,27 +1071,16 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             }
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storaegComponentProvider, primaryIndex, jobId, indexOp, 
primaryKeyFields);
-            ISearchOperationCallbackFactory searchCallbackFactory = dataset
-                    .getSearchCallbackFactory(storaegComponentProvider, 
primaryIndex, jobId, indexOp, primaryKeyFields);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                    metaItemType, compactionInfo.first, compactionInfo.second);
+            IIndexDataflowHelperFactory idfh =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh,
-                        metadataPageManagerFactory);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh);
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, null, true,
-                        indexName, null, modificationCallbackFactory, 
searchCallbackFactory,
-                        metadataPageManagerFactory);
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
+                        null, true, modificationCallbackFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -1301,9 +1092,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> 
dataSourceIndex,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression 
filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey) throws 
AlgebricksException {
+            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression 
filterExpr,
+            RecordDescriptor inputRecordDesc, JobGenContext context, 
JobSpecification spec, boolean bulkload,
+            List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKey)
+            throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String dataverseName = 
dataSourceIndex.getDataSource().getId().getDataverseName();
         String datasetName = 
dataSourceIndex.getDataSource().getId().getDatasourceName();
@@ -1326,18 +1118,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         switch (secondaryIndex.getIndexType()) {
             case BTREE:
                 return getBTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
                         bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
             case RTREE:
                 return getRTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
                         bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
                 return getInvertedIndexRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
inputRecordDesc, context, spec, indexOp,
                         secondaryIndex.getIndexType(), bulkload, 
prevSecondaryKeys, prevAdditionalFilteringKeys);
             default:
                 throw new AlgebricksException(
@@ -1349,7 +1141,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBTreeRuntime(String dataverseName,
             String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor 
inputRecordDesc, JobGenContext context,
             JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
             List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
@@ -1361,14 +1153,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         // generate field permutations
         int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
         int[] modificationCallbackPrimaryKeyFields = new 
int[primaryKeys.size()];
         int i = 0;
         int j = 0;
         for (LogicalVariable varKey : secondaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
             i++;
         }
         for (LogicalVariable varKey : primaryKeys) {
@@ -1405,90 +1195,29 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 prevFieldPermutation[numKeys] = idx;
             }
         }
-
-        String itemTypeName = dataset.getItemTypeName();
-        ARecordType itemType;
         try {
-            itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
-            validateRecordType(itemType);
-            ARecordType metaType =
-                    dataset.hasMetaPart()
-                            ? (ARecordType) 
MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                                    dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName()).getDatatype()
-                            : null;
-
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
-            int[] filterFields;
-            int[] btreeFields;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numKeys;
-                btreeFields = new int[numKeys];
-                for (int k = 0; k < btreeFields.length; k++) {
-                    btreeFields[k] = k;
-                }
-            }
-
-            List<List<String>> secondaryKeyNames = 
secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[numKeys];
-            for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType =
-                        
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), 
secondaryKeyNames.get(i), itemType);
-                IAType keyType = keyPairType.first;
-                comparatorFactories[i] =
-                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = itemType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] =
-                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
-
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            ISearchOperationCallbackFactory searchOpCallbackFactory = 
dataset.getSearchCallbackFactory(
-                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
-                    metaType, compactionInfo.first, compactionInfo.second);
+            IIndexDataflowHelperFactory idfh =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh,
-                        metadataPageManagerFactory);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh);
             } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory, prevFieldPermutation,
-                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory(), true);
+                op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh,
+                        filterFactory, modificationCallbackFactory, 
prevFieldPermutation);
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory,
-                        metadataPageManagerFactory);
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
+                        filterFactory, false, modificationCallbackFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1524,8 +1253,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
             int numKeys = numSecondaryKeys + numPrimaryKeys;
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[numSecondaryKeys];
 
             int numFilterFields = DatasetUtil.getFilterField(dataset) == null 
? 0 : 1;
             int[] fieldPermutation = new int[numKeys + numFilterFields];
@@ -1573,75 +1300,27 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     prevFieldPermutation[numKeys] = idx;
                 }
             }
-
-            IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-            IPrimitiveValueProviderFactory[] valueProviderFactories =
-                    new IPrimitiveValueProviderFactory[numSecondaryKeys];
-            for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] =
-                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType,
 true);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-                valueProviderFactories[i] = primitiveValueProviderFactory;
-            }
-            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
-            int[] btreeFields = new int[primaryComparatorFactories.length];
-            for (int k = 0; k < btreeFields.length; k++) {
-                btreeFields[k] = k + numSecondaryKeys;
-            }
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, recType);
-            int[] filterFields;
-            int[] rtreeFields;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numSecondaryKeys + numPrimaryKeys;
-                rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-                for (int k = 0; k < rtreeFields.length; k++) {
-                    rtreeFields[k] = k;
-                }
-            }
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
+            IIndexDataflowHelperFactory indexDataflowHelperFactory =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        primaryComparatorFactories, btreeFields, 
fieldPermutation,
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false,
-                        indexDataflowHelperFactory, 
metadataPageManagerFactory);
+                        indexDataflowHelperFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, 
indexDataflowHelperFactory, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
searchCallbackFactory, prevFieldPermutation,
-                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory(), true);
+                op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
recordDesc, fieldPermutation,
+                        indexDataflowHelperFactory, filterFactory, 
modificationCallbackFactory, prevFieldPermutation);
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, indexOp, 
indexDataflowHelperFactory, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
searchCallbackFactory,
-                        metadataPageManagerFactory);
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexOp,
+                        indexDataflowHelperFactory, filterFactory, false, 
modificationCallbackFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1733,87 +1412,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 prevFieldPermutation[numKeys] = idx;
             }
         }
-
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
         try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, 
dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-            validateRecordType(itemType);
-            ARecordType recType = (ARecordType) itemType;
-
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
-
-            List<List<String>> secondaryKeyExprs = 
secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-
-            int numTokenFields = 0;
-
-            // SecondaryKeys.size() can be two if it comes from the bulkload.
-            // In this case, [token, number of token] are the secondaryKeys.
-            if (!isPartitioned || (secondaryKeys.size() > 1)) {
-                numTokenFields = secondaryKeys.size();
-            } else if (isPartitioned && (secondaryKeys.size() == 1)) {
-                numTokenFields = secondaryKeys.size() + 1;
-            }
-
-            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
-            ITypeTraits[] invListsTypeTraits = new 
ITypeTraits[primaryKeys.size()];
-            IBinaryComparatorFactory[] tokenComparatorFactories = new 
IBinaryComparatorFactory[numTokenFields];
-            IBinaryComparatorFactory[] invListComparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
-
-            IAType secondaryKeyType;
-            Pair<IAType, Boolean> keyPairType =
-                    
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyExprs.get(0), recType);
-            secondaryKeyType = keyPairType.first;
-
-            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
-
-            i = 0;
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                invListsTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            tokenComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
-            tokenTypeTraits[0] = 
NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
-            if (isPartitioned) {
-                // The partitioning field is hardcoded to be a short *without*
-                // an Asterix type tag.
-                tokenComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
-                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
-            }
-            IBinaryTokenizerFactory tokenizerFactory = 
NonTaggedFormatUtil.getBinaryTokenizerFactory(
-                    secondaryKeyType.getTypeTag(), indexType, 
secondaryIndex.getGramLength());
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, recType);
-
-            int[] filterFields;
-            int[] invertedIndexFields;
-            int[] filterFieldsForNonBulkLoadOps;
-            int[] invertedIndexFieldsForNonBulkLoadOps;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numTokenFields + primaryKeys.size();
-                invertedIndexFields = new int[numTokenFields + 
primaryKeys.size()];
-                for (int k = 0; k < invertedIndexFields.length; k++) {
-                    invertedIndexFields[k] = k;
-                }
-                filterFieldsForNonBulkLoadOps = new int[numFilterFields];
-                //for non-bulk-loads, there is only <SK,PK,F> in the incoming 
tuples
-                filterFieldsForNonBulkLoadOps[0] = numKeys;
-                invertedIndexFieldsForNonBulkLoadOps = new int[numKeys];
-                for (int k = 0; k < 
invertedIndexFieldsForNonBulkLoadOps.length; k++) {
-                    invertedIndexFieldsForNonBulkLoadOps[k] = k;
-                }
-            }
-
-            ICcApplicationContext appContext = (ICcApplicationContext) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
 
@@ -1821,33 +1423,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
+            IIndexDataflowHelperFactory indexDataFlowFactory =
+                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, 
recordDesc, fieldPermutation, false,
-                        numElementsHint, false, 
appContext.getStorageManager(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, indexDataFlowFactory,
-                        metadataPageManagerFactory);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataFlowFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        splitsAndConstraint.first, 
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
-                        tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
-                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory,
-                        searchCallbackFactory, indexName, 
prevFieldPermutation, metadataPageManagerFactory);
+                op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexDataFlowFactory,
+                        filterFactory, modificationCallbackFactory, 
prevFieldPermutation);
             } else {
-                op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, 
recordDesc,
-                        appContext.getStorageManager(), 
splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, searchCallbackFactory,
-                        indexName, metadataPageManagerFactory);
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexOp,
+                        indexDataFlowFactory, filterFactory, false, 
modificationCallbackFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1973,7 +1561,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
                     secondaryKeyExprs.get(0), recType);
             secondaryKeyType = keyPairType.first;
-            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+            List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
             i = 0;
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);

Reply via email to