http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java
new file mode 100644
index 0000000..362305e
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.api.IResourceFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexLocalResourceFactory;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class InvertedIndexResourceFactoryProvider implements 
IResourceFactoryProvider {
+    public static final InvertedIndexResourceFactoryProvider INSTANCE = new 
InvertedIndexResourceFactoryProvider();
+
+    private InvertedIndexResourceFactoryProvider() {
+    }
+
+    @Override
+    public IResourceFactory getResourceFactory(MetadataProvider mdProvider, 
Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] 
filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories) throws 
AlgebricksException {
+        // Get basic info
+        List<List<String>> primaryKeys = dataset.getPrimaryKeys();
+        List<List<String>> secondaryKeys = index.getKeyFieldNames();
+        List<String> filterFieldName = DatasetUtil.getFilterField(dataset);
+        int numPrimaryKeys = primaryKeys.size();
+        int numSecondaryKeys = secondaryKeys.size();
+        // Validate
+        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_INDEX_TYPE_NOT_SUPPORTED_FOR_DATASET_TYPE,
+                    index.getIndexType().name(), dataset.getDatasetType());
+        }
+        if (numPrimaryKeys > 1) {
+            throw new AsterixException("Cannot create inverted index on 
dataset with composite primary key.");
+        }
+        if (numSecondaryKeys > 1) {
+            throw new AsterixException("Cannot create composite inverted index 
on multiple fields.");
+        }
+        boolean isPartitioned = index.getIndexType() == 
IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || index.getIndexType() == 
IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
+        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 
+ numPrimaryKeys;
+        int[] invertedIndexFields = null;
+        int[] secondaryFilterFieldsForNonBulkLoadOps = null;
+        int[] invertedIndexFieldsForNonBulkLoadOps = null;
+        int[] secondaryFilterFields = null;
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new 
int[filterFieldName.size()];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + 
numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + 
numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; 
i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+            secondaryFilterFields = new int[filterFieldName.size()];
+            secondaryFilterFields[0] = numTokenKeyPairFields - numPrimaryKeys 
+ numPrimaryKeys;
+        }
+        IStorageComponentProvider storageComponentProvider = 
mdProvider.getStorageComponentProvider();
+        IStorageManager storageManager = 
storageComponentProvider.getStorageManager();
+        ILSMOperationTrackerFactory opTrackerFactory = 
dataset.getIndexOperationTrackerFactory(index);
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory = 
dataset.getIoOperationCallbackFactory(index);
+        IMetadataPageManagerFactory metadataPageManagerFactory =
+                storageComponentProvider.getMetadataPageManagerFactory();
+        AsterixVirtualBufferCacheProvider vbcProvider = new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+        ILSMIOOperationSchedulerProvider ioSchedulerProvider =
+                storageComponentProvider.getIoOperationSchedulerProvider();
+        boolean durable = !dataset.isTemp();
+        double bloomFilterFalsePositiveRate = 
mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
+        ITypeTraits[] typeTraits = getInvListTypeTraits(mdProvider, dataset, 
recordType, metaType);
+        IBinaryComparatorFactory[] cmpFactories =
+                getInvListComparatorFactories(mdProvider, dataset, recordType, 
metaType);
+        ITypeTraits[] tokenTypeTraits = getTokenTypeTraits(dataset, index, 
recordType, metaType);
+        IBinaryComparatorFactory[] tokenCmpFactories =
+                getTokenComparatorFactories(dataset, index, recordType, 
metaType);
+        IBinaryTokenizerFactory tokenizerFactory = 
getTokenizerFactory(dataset, index, recordType, metaType);
+        return new LSMInvertedIndexLocalResourceFactory(storageManager, 
typeTraits, cmpFactories, filterTypeTraits,
+                filterCmpFactories, secondaryFilterFields, opTrackerFactory, 
ioOpCallbackFactory,
+                metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, 
mergePolicyFactory, mergePolicyProperties,
+                durable, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, 
isPartitioned, invertedIndexFields,
+                secondaryFilterFieldsForNonBulkLoadOps, 
invertedIndexFieldsForNonBulkLoadOps,
+                bloomFilterFalsePositiveRate);
+    }
+
+    private static ITypeTraits[] getInvListTypeTraits(MetadataProvider 
metadataProvider, Dataset dataset,
+            ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        ITypeTraits[] primaryTypeTraits = 
dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType);
+        ITypeTraits[] typeTraits = new ITypeTraits[primaryTypeTraits.length - 
1];
+        for (int i = 0; i < typeTraits.length; i++) {
+            typeTraits[i] = primaryTypeTraits[i];
+        }
+        return typeTraits;
+    }
+
+    private static IBinaryComparatorFactory[] 
getInvListComparatorFactories(MetadataProvider metadataProvider,
+            Dataset dataset, ARecordType recordType, ARecordType metaType) 
throws AlgebricksException {
+        return dataset.getPrimaryComparatorFactories(metadataProvider, 
recordType, metaType);
+    }
+
+    private static ITypeTraits[] getTokenTypeTraits(Dataset dataset, Index 
index, ARecordType recordType,
+            ARecordType metaType) throws AlgebricksException {
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
+                    indexType, 
RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), 
dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, 
numSecondaryKeys,
+                    indexType, 1);
+        }
+        boolean isPartitioned = indexType == 
IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
+        ARecordType sourceType;
+        List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+        if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
+            sourceType = recordType;
+        } else {
+            sourceType = metaType;
+        }
+        Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), sourceType);
+        IAType secondaryKeyType = keyTypePair.first;
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : 
numSecondaryKeys + 1;
+        ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenTypeTraits[0] = 
NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an 
Asterix type tag.
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        return tokenTypeTraits;
+    }
+
+    private static IBinaryComparatorFactory[] 
getTokenComparatorFactories(Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
+                    indexType, 
RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), 
dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, 
numSecondaryKeys,
+                    indexType, 1);
+        }
+        boolean isPartitioned = indexType == 
IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
+        List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+        ARecordType sourceType;
+        if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
+            sourceType = recordType;
+        } else {
+            sourceType = metaType;
+        }
+        Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), sourceType);
+        IAType secondaryKeyType = keyTypePair.first;
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : 
numSecondaryKeys + 1;
+        IBinaryComparatorFactory[] tokenComparatorFactories = new 
IBinaryComparatorFactory[numTokenFields];
+        tokenComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an 
Asterix type tag.
+            tokenComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+        }
+        return tokenComparatorFactories;
+    }
+
+    private static IBinaryTokenizerFactory getTokenizerFactory(Dataset 
dataset, Index index, ARecordType recordType,
+            ARecordType metaType) throws AlgebricksException {
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
+                    indexType, 
RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), 
dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, 
numSecondaryKeys,
+                    indexType, 1);
+        }
+        ARecordType sourceType;
+        List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+        if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
+            sourceType = recordType;
+        } else {
+            sourceType = metaType;
+        }
+        Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), sourceType);
+        IAType secondaryKeyType = keyTypePair.first;
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        return 
NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), 
indexType,
+                index.getGramLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
deleted file mode 100644
index 45034de..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-
-public class RTreeDataflowHelperFactoryProvider implements 
IIndexDataflowHelperFactoryProvider {
-
-    public static final RTreeDataflowHelperFactoryProvider INSTANCE = new 
RTreeDataflowHelperFactoryProvider();
-
-    private RTreeDataflowHelperFactoryProvider() {
-    }
-
-    protected RTreePolicyType rTreePolicyType() {
-        return RTreePolicyType.RTREE;
-    }
-
-    @Override
-    public IIndexDataflowHelperFactory 
getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset,
-            Index index, ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, ITypeTraits[] 
filterTypeTraits,
-            IBinaryComparatorFactory[] filterCmpFactories) throws 
AlgebricksException {
-        if (index.getKeyFieldNames().size() != 1) {
-            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD,
-                    index.getKeyFieldNames().size(), index.getIndexType(), 1);
-        }
-        IAType spatialType = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
-                index.getKeyFieldNames().get(0), recordType).first;
-        if (spatialType == null) {
-            throw new 
CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND,
-                    StringUtils.join(index.getKeyFieldNames().get(0), '.'));
-        }
-        List<List<String>> primaryKeyFields = 
DatasetUtil.getPartitioningKeys(dataset);
-        int numPrimaryKeys = primaryKeyFields.size();
-        ITypeTraits[] primaryTypeTraits = null;
-        IBinaryComparatorFactory[] primaryComparatorFactories = null;
-        IStorageComponentProvider storageComponentProvider = 
mdProvider.getStorageComponentProvider();
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
-            primaryComparatorFactories = new 
IBinaryComparatorFactory[numPrimaryKeys];
-            List<Integer> indicators = null;
-            if (dataset.hasMetaPart()) {
-                indicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
-            }
-            for (int i = 0; i < numPrimaryKeys; i++) {
-                IAType keyType = (indicators == null || indicators.get(i) == 0)
-                        ? recordType.getSubFieldType(primaryKeyFields.get(i))
-                        : metaType.getSubFieldType(primaryKeyFields.get(i));
-                primaryComparatorFactories[i] = 
storageComponentProvider.getComparatorFactoryProvider()
-                        .getBinaryComparatorFactory(keyType, true);
-                primaryTypeTraits[i] = 
storageComponentProvider.getTypeTraitProvider().getTypeTrait(keyType);
-            }
-            primaryTypeTraits[numPrimaryKeys] =
-                    
storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
-            if (dataset.hasMetaPart()) {
-                primaryTypeTraits[numPrimaryKeys + 1] =
-                        
storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
-            }
-        }
-        boolean isPointMBR =
-                spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
-        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = numDimensions * 2;
-        IBinaryComparatorFactory[] secondaryComparatorFactories =
-                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        IPrimitiveValueProviderFactory[] valueProviderFactories =
-                new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new 
ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        ATypeTag keyType = nestedKeyType.getTypeTag();
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            secondaryComparatorFactories[i] = 
storageComponentProvider.getComparatorFactoryProvider()
-                    .getBinaryComparatorFactory(nestedKeyType, true);
-            secondaryTypeTraits[i] = 
storageComponentProvider.getTypeTraitProvider().getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = 
storageComponentProvider.getPrimitiveValueProviderFactory();
-
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
(dataset.getDatasetType() == DatasetType.INTERNAL)
-                    ? primaryTypeTraits[i] : 
IndexingConstants.getTypeTraits(i);
-        }
-        int[] rtreeFields = null;
-        if (filterTypeTraits != null && filterTypeTraits.length > 0) {
-            rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
-            for (int i = 0; i < rtreeFields.length; i++) {
-                rtreeFields[i] = i;
-            }
-        }
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            int[] secondaryFilterFields = (filterTypeTraits != null && 
filterTypeTraits.length > 0)
-                    ? new int[] { numNestedSecondaryKeyFields + numPrimaryKeys 
} : null;
-            IBinaryComparatorFactory[] btreeCompFactories = 
getComparatorFactoriesForDeletedKeyBTree(
-                    secondaryTypeTraits, primaryComparatorFactories, 
secondaryComparatorFactories);
-            return new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, 
rTreePolicyType(),
-                    btreeCompFactories, new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                    mergePolicyFactory, mergePolicyProperties, 
dataset.getIndexOperationTrackerFactory(index),
-                    storageComponentProvider.getIoOperationSchedulerProvider(),
-                    dataset.getIoOperationCallbackFactory(index),
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length), rtreeFields,
-                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, !dataset.getDatasetDetails().isTemp(),
-                    isPointMBR);
-        } else {
-            return new 
ExternalRTreeDataflowHelperFactory(valueProviderFactories, rTreePolicyType(),
-                    
ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), 
mergePolicyFactory,
-                    mergePolicyProperties, 
dataset.getIndexOperationTrackerFactory(index),
-                    storageComponentProvider.getIoOperationSchedulerProvider(),
-                    dataset.getIoOperationCallbackFactory(index),
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                    
mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
-                    new int[] { numNestedSecondaryKeyFields },
-                    
ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR);
-        }
-    }
-
-    private static IBinaryComparatorFactory[] 
getComparatorFactoriesForDeletedKeyBTree(
-            ITypeTraits[] secondaryTypeTraits, IBinaryComparatorFactory[] 
primaryComparatorFactories,
-            IBinaryComparatorFactory[] secondaryComparatorFactories) {
-        IBinaryComparatorFactory[] btreeCompFactories = new 
IBinaryComparatorFactory[secondaryTypeTraits.length];
-        int i = 0;
-        for (; i < secondaryComparatorFactories.length; i++) {
-            btreeCompFactories[i] = secondaryComparatorFactories[i];
-        }
-        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
-            btreeCompFactories[i] = primaryComparatorFactories[j];
-        }
-        return btreeCompFactories;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
new file mode 100644
index 0000000..425f8a1
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.metadata.api.IResourceFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeLocalResourceFactory;
+import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterLocalResourceFactory;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class RTreeResourceFactoryProvider implements IResourceFactoryProvider {
+
+    private static final RTreePolicyType rTreePolicyType = 
RTreePolicyType.RTREE;
+    public static final RTreeResourceFactoryProvider INSTANCE = new 
RTreeResourceFactoryProvider();
+
+    private RTreeResourceFactoryProvider() {
+    }
+
+    @Override
+    public IResourceFactory getResourceFactory(MetadataProvider mdProvider, 
Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] 
filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories) throws 
AlgebricksException {
+        if (index.getKeyFieldNames().size() != 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD,
+                    index.getKeyFieldNames().size(), index.getIndexType(), 1);
+        }
+        IAType spatialType = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), recordType).first;
+        if (spatialType == null) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND,
+                    StringUtils.join(index.getKeyFieldNames().get(0), '.'));
+        }
+        List<List<String>> primaryKeyFields = dataset.getPrimaryKeys();
+        int numPrimaryKeys = primaryKeyFields.size();
+        ITypeTraits[] primaryTypeTraits = null;
+        IBinaryComparatorFactory[] primaryComparatorFactories = null;
+        IStorageComponentProvider storageComponentProvider = 
mdProvider.getStorageComponentProvider();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
+            primaryComparatorFactories = new 
IBinaryComparatorFactory[numPrimaryKeys];
+            List<Integer> indicators = null;
+            if (dataset.hasMetaPart()) {
+                indicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                IAType keyType = (indicators == null || indicators.get(i) == 0)
+                        ? recordType.getSubFieldType(primaryKeyFields.get(i))
+                        : metaType.getSubFieldType(primaryKeyFields.get(i));
+                primaryComparatorFactories[i] = 
storageComponentProvider.getComparatorFactoryProvider()
+                        .getBinaryComparatorFactory(keyType, true);
+                primaryTypeTraits[i] = 
storageComponentProvider.getTypeTraitProvider().getTypeTrait(keyType);
+            }
+            primaryTypeTraits[numPrimaryKeys] =
+                    
storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
+            if (dataset.hasMetaPart()) {
+                primaryTypeTraits[numPrimaryKeys + 1] =
+                        
storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
+            }
+        }
+        boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IBinaryComparatorFactory[] secondaryComparatorFactories =
+                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        IPrimitiveValueProviderFactory[] valueProviderFactories =
+                new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new 
ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        ATypeTag keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            secondaryComparatorFactories[i] = 
storageComponentProvider.getComparatorFactoryProvider()
+                    .getBinaryComparatorFactory(nestedKeyType, true);
+            secondaryTypeTraits[i] = 
storageComponentProvider.getTypeTraitProvider().getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = 
storageComponentProvider.getPrimitiveValueProviderFactory();
+
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
(dataset.getDatasetType() == DatasetType.INTERNAL)
+                    ? primaryTypeTraits[i] : 
IndexingConstants.getTypeTraits(i);
+        }
+        int[] rtreeFields = null;
+        if (filterTypeTraits != null && filterTypeTraits.length > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+        }
+        IStorageManager storageManager = 
storageComponentProvider.getStorageManager();
+        ILSMOperationTrackerFactory opTrackerFactory = 
dataset.getIndexOperationTrackerFactory(index);
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory = 
dataset.getIoOperationCallbackFactory(index);
+        IMetadataPageManagerFactory metadataPageManagerFactory =
+                storageComponentProvider.getMetadataPageManagerFactory();
+        ILSMIOOperationSchedulerProvider ioSchedulerProvider =
+                storageComponentProvider.getIoOperationSchedulerProvider();
+        boolean durable = !dataset.isTemp();
+        ILinearizeComparatorFactory linearizeCmpFactory =
+                MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length);
+        ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, 
recordType, metaType);
+        IBinaryComparatorFactory[] rtreeCmpFactories = 
getCmpFactories(mdProvider, index, recordType, metaType);
+        int[] secondaryFilterFields = (filterTypeTraits != null && 
filterTypeTraits.length > 0)
+                ? new int[] { numNestedSecondaryKeyFields + numPrimaryKeys } : 
null;
+        IBinaryComparatorFactory[] btreeCompFactories =
+                dataset.getDatasetType() == DatasetType.EXTERNAL ? 
IndexingConstants.getBuddyBtreeComparatorFactories()
+                        : 
getComparatorFactoriesForDeletedKeyBTree(secondaryTypeTraits, 
primaryComparatorFactories,
+                                secondaryComparatorFactories);
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            AsterixVirtualBufferCacheProvider vbcProvider =
+                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+            return new 
LSMRTreeWithAntiMatterLocalResourceFactory(storageManager, typeTraits, 
rtreeCmpFactories,
+                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory,
+                    metadataPageManagerFactory, vbcProvider, 
ioSchedulerProvider, mergePolicyFactory,
+                    mergePolicyProperties, durable, valueProviderFactories, 
rTreePolicyType, linearizeCmpFactory,
+                    rtreeFields, isPointMBR, btreeCompFactories);
+        } else {
+            return new ExternalRTreeLocalResourceFactory(storageManager, 
typeTraits, rtreeCmpFactories,
+                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory,
+                    metadataPageManagerFactory, ioSchedulerProvider, 
mergePolicyFactory, mergePolicyProperties, durable,
+                    btreeCompFactories, valueProviderFactories, 
rTreePolicyType, linearizeCmpFactory, rtreeFields,
+                    new int[] { numNestedSecondaryKeyFields }, isPointMBR,
+                    
mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate());
+        }
+    }
+
+    private static IBinaryComparatorFactory[] 
getComparatorFactoriesForDeletedKeyBTree(
+            ITypeTraits[] secondaryTypeTraits, IBinaryComparatorFactory[] 
primaryComparatorFactories,
+            IBinaryComparatorFactory[] secondaryComparatorFactories) {
+        IBinaryComparatorFactory[] btreeCompFactories = new 
IBinaryComparatorFactory[secondaryTypeTraits.length];
+        int i = 0;
+        for (; i < secondaryComparatorFactories.length; i++) {
+            btreeCompFactories[i] = secondaryComparatorFactories[i];
+        }
+        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
+            btreeCompFactories[i] = primaryComparatorFactories[j];
+        }
+        return btreeCompFactories;
+    }
+
+    private static ITypeTraits[] getTypeTraits(MetadataProvider 
metadataProvider, Dataset dataset, Index index,
+            ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        ITypeTraitProvider ttProvider = 
metadataProvider.getStorageComponentProvider().getTypeTraitProvider();
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        ITypeTraits[] primaryTypeTraits = 
dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType);
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException("Cannot use " + numSecondaryKeys + " 
fields as a key for the R-tree index. "
+                    + "There can be only one field as a key for the R-tree 
index.");
+        }
+        ARecordType sourceType;
+        List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+        if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
+            sourceType = recordType;
+        } else {
+            sourceType = metaType;
+        }
+        Pair<IAType, Boolean> spatialTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                secondaryKeyFields.get(0), sourceType);
+        IAType spatialType = spatialTypePair.first;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        ITypeTraits[] secondaryTypeTraits = new 
ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            secondaryTypeTraits[i] = ttProvider.getTypeTrait(nestedKeyType);
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
primaryTypeTraits[i];
+        }
+        return secondaryTypeTraits;
+    }
+
+    private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider 
metadataProvider, Index index,
+            ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
+        IBinaryComparatorFactoryProvider cmpFactoryProvider =
+                
metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider();
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException("Cannot use " + numSecondaryKeys + " 
fields as a key for the R-tree index. "
+                    + "There can be only one field as a key for the R-tree 
index.");
+        }
+        List<Integer> keySourceIndicators = 
index.getKeyFieldSourceIndicators();
+        ARecordType sourceType;
+        if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
+            sourceType = recordType;
+        } else {
+            sourceType = metaType;
+        }
+        Pair<IAType, Boolean> spatialTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                secondaryKeyFields.get(0), sourceType);
+        IAType spatialType = spatialTypePair.first;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
+        }
+        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IBinaryComparatorFactory[] secondaryComparatorFactories =
+                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            secondaryComparatorFactories[i] = 
cmpFactoryProvider.getBinaryComparatorFactory(nestedKeyType, true);
+        }
+        return secondaryComparatorFactories;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index faea7fd..1f7914d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -31,10 +30,6 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
-import 
org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
@@ -53,16 +48,13 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class SecondaryBTreeOperationsHelper extends 
SecondaryTreeIndexOperationsHelper {
 
@@ -73,57 +65,12 @@ public class SecondaryBTreeOperationsHelper extends 
SecondaryTreeIndexOperations
     }
 
     @Override
-    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
-        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        ILocalResourceFactoryProvider localResourceFactoryProvider;
-        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
-                index, itemType, metaType, mergePolicyFactory, 
mergePolicyFactoryProperties);
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            //prepare a LocalResourceMetadata which will be stored in NC's 
local resource repository
-            LSMBTreeLocalResourceMetadataFactory localResourceMetadata = new 
LSMBTreeLocalResourceMetadataFactory(
-                    secondaryTypeTraits, secondaryComparatorFactories, 
secondaryBloomFilterKeyFields, false,
-                    dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
-                    filterCmpFactories, secondaryBTreeFields, 
secondaryFilterFields,
-                    dataset.getIndexOperationTrackerFactory(index), 
dataset.getIoOperationCallbackFactory(index),
-                    storageComponentProvider.getMetadataPageManagerFactory());
-            localResourceFactoryProvider =
-                    new 
PersistentLocalResourceFactoryProvider(localResourceMetadata, 
LocalResource.LSMBTreeResource);
-        } else {
-            // External dataset local resource and dataflow helper
-            int[] buddyBreeFields = new int[] { 
index.getKeyFieldNames().size() };
-            ExternalBTreeWithBuddyLocalResourceMetadataFactory 
localResourceMetadata =
-                    new 
ExternalBTreeWithBuddyLocalResourceMetadataFactory(dataset.getDatasetId(),
-                            secondaryComparatorFactories, secondaryTypeTraits, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, buddyBreeFields,
-                            dataset.getIndexOperationTrackerFactory(index),
-                            dataset.getIoOperationCallbackFactory(index),
-                            
storageComponentProvider.getMetadataPageManagerFactory());
-            localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(localResourceMetadata,
-                    LocalResource.ExternalBTreeWithBuddyResource);
-        }
-        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp =
-                new TreeIndexCreateOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
-                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
secondaryFileSplitProvider,
-                        secondaryTypeTraits, secondaryComparatorFactories, 
secondaryBloomFilterKeyFields,
-                        indexDataflowHelperFactory,
-                        localResourceFactoryProvider, 
dataset.getModificationCallbackFactory(storageComponentProvider,
-                                index, null, IndexOperation.CREATE, null),
-                        
storageComponentProvider.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
secondaryIndexCreateOp,
-                secondaryPartitionConstraint);
-        spec.addRoot(secondaryIndexCreateOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
         int[] fieldPermutation = 
createFieldPermutationForBulkLoadOp(index.getKeyFieldNames().size());
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
-                index, itemType, metaType, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
secondaryFileSplitProvider);
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             /*
              * In case of external data,
@@ -152,23 +99,22 @@ public class SecondaryBTreeOperationsHelper extends 
SecondaryTreeIndexOperations
             // Sort by secondary keys.
             ExternalSortOperatorDescriptor sortOp = createSortOp(spec, 
secondaryComparatorFactories, secondaryRecDesc);
             // Create secondary BTree bulk load op.
-            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            AbstractSingleActivityOperatorDescriptor secondaryBulkLoadOp;
             IOperatorDescriptor root;
             if (externalFiles != null) {
                 // Transaction load
                 secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, 
fieldPermutation, dataflowHelperFactory,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-                root = secondaryBulkLoadOp;
             } else {
                 // Initial load
-                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, 
fieldPermutation, dataflowHelperFactory,
+                secondaryBulkLoadOp = createExternalIndexBulkLoadOp(spec, 
fieldPermutation, dataflowHelperFactory,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-                AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
-                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
-                        new RecordDescriptor[] { secondaryRecDesc });
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
-                root = metaOp;
             }
+            AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                    new RecordDescriptor[] { secondaryRecDesc });
+            spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
+            root = metaOp;
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
             if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
                 spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index e5b5b9f..fa519e1 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -34,17 +34,18 @@ import 
org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import 
org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor;
 import 
org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
@@ -52,7 +53,6 @@ import 
org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -83,6 +83,7 @@ import 
org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -117,7 +118,7 @@ public abstract class SecondaryIndexOperationsHelper {
     protected RecordDescriptor secondaryRecDesc;
     protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories;
     protected ILSMMergePolicyFactory mergePolicyFactory;
-    protected Map<String, String> mergePolicyFactoryProperties;
+    protected Map<String, String> mergePolicyProperties;
     protected RecordDescriptor enforcedRecDesc;
     protected int numFilterFields;
     protected List<String> filterFieldName;
@@ -132,8 +133,8 @@ public abstract class SecondaryIndexOperationsHelper {
 
     // Prevent public construction. Should be created via createIndexCreator().
     protected SecondaryIndexOperationsHelper(Dataset dataset, Index index, 
PhysicalOptimizationConfig physOptConf,
-            MetadataProvider metadataProvider, ARecordType recType,
-            ARecordType metaType, ARecordType enforcedType, ARecordType 
enforcedMetaType) {
+            MetadataProvider metadataProvider, ARecordType recType, 
ARecordType metaType, ARecordType enforcedType,
+            ARecordType enforcedMetaType) {
         this.dataset = dataset;
         this.index = index;
         this.physOptConf = physOptConf;
@@ -150,14 +151,12 @@ public abstract class SecondaryIndexOperationsHelper {
         SecondaryIndexOperationsHelper indexOperationsHelper;
         switch (index.getIndexType()) {
             case BTREE:
-                indexOperationsHelper =
-                        new SecondaryBTreeOperationsHelper(dataset, index, 
physOptConf,
-                                metadataProvider, recType, metaType, 
enforcedType, enforcedMetaType);
+                indexOperationsHelper = new 
SecondaryBTreeOperationsHelper(dataset, index, physOptConf,
+                        metadataProvider, recType, metaType, enforcedType, 
enforcedMetaType);
                 break;
             case RTREE:
-                indexOperationsHelper =
-                        new SecondaryRTreeOperationsHelper(dataset, index, 
physOptConf,
-                                metadataProvider, recType, metaType, 
enforcedType, enforcedMetaType);
+                indexOperationsHelper = new 
SecondaryRTreeOperationsHelper(dataset, index, physOptConf,
+                        metadataProvider, recType, metaType, enforcedType, 
enforcedMetaType);
                 break;
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
@@ -189,7 +188,7 @@ public abstract class SecondaryIndexOperationsHelper {
                 metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
         secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-        numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
+        numPrimaryKeys = dataset.getPrimaryKeys().size();
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             filterFieldName = DatasetUtil.getFilterField(dataset);
             if (filterFieldName != null) {
@@ -208,7 +207,7 @@ public abstract class SecondaryIndexOperationsHelper {
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
         mergePolicyFactory = compactionInfo.first;
-        mergePolicyFactoryProperties = compactionInfo.second;
+        mergePolicyProperties = compactionInfo.second;
         if (numFilterFields > 0) {
             setFilterTypeTraitsAndComparators();
         }
@@ -238,7 +237,7 @@ public abstract class SecondaryIndexOperationsHelper {
     protected abstract int getNumSecondaryKeys();
 
     protected void setPrimaryRecDescAndComparators() throws 
AlgebricksException {
-        List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+        List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
         ISerializerDeserializer[] primaryRecFields =
                 new ISerializerDeserializer[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
         ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
@@ -292,8 +291,7 @@ public abstract class SecondaryIndexOperationsHelper {
         return keyProviderOp;
     }
 
-    protected BTreeSearchOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec)
-            throws AlgebricksException {
+    protected BTreeSearchOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
         // -Infinity
         int[] lowKeyFields = null;
         // +Infinity
@@ -304,22 +302,15 @@ public abstract class SecondaryIndexOperationsHelper {
         boolean isWriteTransaction = metadataProvider.isWriteTransaction();
         IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, isWriteTransaction);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        Index primaryIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName());
-
         boolean temp = dataset.getDatasetDetails().isTemp();
         ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
                 : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, 
dataset.getDatasetId(),
                         primaryBloomFilterKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
-        BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), 
primaryComparatorFactories,
-                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, 
true, true,
-                dataset.getIndexDataflowHelperFactory(metadataProvider, 
primaryIndex, itemType, metaType,
-                        mergePolicyFactory, mergePolicyFactoryProperties),
-                false, false, null, searchCallbackFactory, null, null,
-                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
-
+        IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
+        BTreeSearchOperatorDescriptor primarySearchOp =
+                new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, 
lowKeyFields, highKeyFields, true, true,
+                        indexHelperFactory, false, false, null, 
searchCallbackFactory, null, null, false);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
@@ -401,11 +392,18 @@ public abstract class SecondaryIndexOperationsHelper {
             int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
             throws AlgebricksException {
         TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
-                secondaryRecDesc.getTypeTraits(), 
secondaryComparatorFactories, secondaryBloomFilterKeyFields,
-                fieldPermutation, fillFactor, false, numElementsHint, false, 
dataflowHelperFactory,
-                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+                secondaryRecDesc, fieldPermutation, fillFactor, false, 
numElementsHint, false, dataflowHelperFactory);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    protected TreeIndexBulkLoadOperatorDescriptor 
createExternalIndexBulkLoadOp(JobSpecification spec,
+            int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
+            throws AlgebricksException {
+        ExternalIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, fieldPermutation, fillFactor, false, 
numElementsHint, false, dataflowHelperFactory,
+                
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, 
metadataProvider));
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
@@ -517,11 +515,7 @@ public abstract class SecondaryIndexOperationsHelper {
             }
         }
         ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkModifyOperatorDescriptor(
-                spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, secondaryTypeTraits, 
secondaryComparatorFactories,
-                secondaryBloomFilterKeyFields, dataflowHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                deletedFiles, fieldPermutation, fillFactor, numElementsHint,
-                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+                spec, dataflowHelperFactory, deletedFiles, fieldPermutation, 
fillFactor, false, numElementsHint, false);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 985f8cc..1bb1377 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -18,14 +18,10 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import java.util.Map;
-
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.IResourceFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -35,9 +31,6 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
-import 
org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -58,23 +51,16 @@ import 
org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDropOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import org.apache.hyracks.storage.common.file.LocalResource;
 
-public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryIndexOperationsHelper {
+public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryTreeIndexOperationsHelper {
 
     private IAType secondaryKeyType;
     private ITypeTraits[] invListsTypeTraits;
@@ -219,36 +205,6 @@ public class SecondaryInvertedIndexOperationsHelper 
extends SecondaryIndexOperat
     }
 
     @Override
-    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
-        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        //prepare a LocalResourceMetadata which will be stored in NC's local 
resource repository
-        IResourceFactory localResourceMetadata = new 
LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits,
-                primaryComparatorFactories, tokenTypeTraits, 
tokenComparatorFactories, tokenizerFactory, isPartitioned,
-                dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
-                filterCmpFactories, invertedIndexFields, 
secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
-                invertedIndexFieldsForNonBulkLoadOps, 
dataset.getIndexOperationTrackerFactory(index),
-                dataset.getIoOperationCallbackFactory(index), 
storageComponentProvider.getMetadataPageManagerFactory());
-        ILocalResourceFactoryProvider localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(
-                localResourceMetadata, LocalResource.LSMInvertedIndexResource);
-
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp =
-                new LSMInvertedIndexCreateOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
-                        secondaryFileSplitProvider, 
storageComponentProvider.getIndexLifecycleManagerProvider(),
-                        tokenTypeTraits, tokenComparatorFactories, 
invListsTypeTraits, primaryComparatorFactories,
-                        tokenizerFactory, dataflowHelperFactory,
-                        localResourceFactoryProvider, 
dataset.getModificationCallbackFactory(storageComponentProvider,
-                                index, null, IndexOperation.CREATE, null),
-                        
storageComponentProvider.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexCreateOp,
-                secondaryPartitionConstraint);
-        spec.addRoot(invIndexCreateOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
 
@@ -282,7 +238,9 @@ public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryIndexOperat
                 createSortOp(spec, tokenKeyPairComparatorFactories, 
tokenKeyPairRecDesc);
 
         // Create secondary inverted index bulk load op.
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = 
createInvertedIndexBulkLoadOp(spec);
+        AbstractSingleActivityOperatorDescriptor invIndexBulkLoadOp = 
createInvertedIndexBulkLoadOp(spec);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexBulkLoadOp,
+                secondaryPartitionConstraint);
 
         AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new 
RecordDescriptor[] {});
@@ -331,71 +289,15 @@ public class SecondaryInvertedIndexOperationsHelper 
extends SecondaryIndexOperat
         return sortOp;
     }
 
-    private LSMInvertedIndexBulkLoadOperatorDescriptor 
createInvertedIndexBulkLoadOp(JobSpecification spec)
+    private AbstractSingleActivityOperatorDescriptor 
createInvertedIndexBulkLoadOp(JobSpecification spec)
             throws AlgebricksException {
         int[] fieldPermutation = new int[numTokenKeyPairFields + 
numFilterFields];
         for (int i = 0; i < fieldPermutation.length; i++) {
             fieldPermutation[i] = i;
         }
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new 
LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, secondaryRecDesc, fieldPermutation, false, 
numElementsHint, false,
-                storageComponentProvider.getStorageManager(), 
secondaryFileSplitProvider,
-                storageComponentProvider.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
-                invListsTypeTraits, primaryComparatorFactories, 
tokenizerFactory, dataflowHelperFactory,
-                storageComponentProvider.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexBulkLoadOp,
-                secondaryPartitionConstraint);
-        return invIndexBulkLoadOp;
-    }
-
-    private IIndexDataflowHelperFactory createDataflowHelperFactory() throws 
AlgebricksException {
-        return dataset.getIndexDataflowHelperFactory(metadataProvider, index, 
itemType, metaType, mergePolicyFactory,
-                mergePolicyFactoryProperties);
-    }
-
-    @Override
-    public JobSpecification buildCompactJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        LSMInvertedIndexCompactOperator compactOp =
-                new LSMInvertedIndexCompactOperator(spec, 
storageComponentProvider.getStorageManager(),
-                        secondaryFileSplitProvider, 
storageComponentProvider.getIndexLifecycleManagerProvider(),
-                        tokenTypeTraits, tokenComparatorFactories, 
invListsTypeTraits, primaryComparatorFactories,
-                        tokenizerFactory, dataflowHelperFactory,
-                        
dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
-                                IndexOperation.FULL_MERGE, null),
-                        
storageComponentProvider.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                secondaryPartitionConstraint);
-
-        spec.addRoot(compactOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    @Override
-    public JobSpecification buildDropJobSpec() throws AlgebricksException {
-        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
-        ARecordType recordType =
-                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
-        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, 
dataset);
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
-                index, recordType, metaType, compactionInfo.first, 
compactionInfo.second);
-        LSMInvertedIndexDropOperatorDescriptor invIdxDrop =
-                new LSMInvertedIndexDropOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
-                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first,
-                        dataflowHelperFactory, 
storageComponentProvider.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIdxDrop,
-                splitsAndConstraint.second);
-        spec.addRoot(invIdxDrop);
-        return spec;
+        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
secondaryFileSplitProvider);
+        return createTreeIndexBulkLoadOp(spec, fieldPermutation, 
dataflowHelperFactory,
+                GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
     }
 }

Reply via email to