http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42600592/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
new file mode 100644
index 0000000..7258be8
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -0,0 +1,2256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import 
org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import 
org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import org.apache.asterix.common.exceptions.AsterixException;
+import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import 
org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import 
org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
+import 
org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
+import 
org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import 
org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import 
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import 
org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
+import 
org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.runtime.util.ClusterStateManager;
+import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+
+public class MetadataProvider implements IMetadataProvider<DataSourceId, 
String> {
+
+    private final AsterixStorageProperties storageProperties;
+    private final ILibraryManager libraryManager;
+    private final Dataverse defaultDataverse;
+
+    private MetadataTransactionContext mdTxnCtx;
+    private boolean isWriteTransaction;
+    private Map<String, String> config;
+    private IAWriterFactory writerFactory;
+    private FileSplit outputFile;
+    private boolean asyncResults;
+    private ResultSetId resultSetId;
+    private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+    private JobId jobId;
+    private Map<String, Integer> locks;
+    private boolean isTemporaryDatasetWriteJob = true;
+
+    public MetadataProvider(Dataverse defaultDataverse) {
+        this.defaultDataverse = defaultDataverse;
+        this.storageProperties = 
AsterixAppContextInfo.INSTANCE.getStorageProperties();
+        this.libraryManager = 
AsterixAppContextInfo.INSTANCE.getLibraryManager();
+    }
+
+    public String getPropertyValue(String propertyName) {
+        return config.get(propertyName);
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public ILibraryManager getLibraryManager() {
+        return libraryManager;
+    }
+
+    public void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    public Dataverse getDefaultDataverse() {
+        return defaultDataverse;
+    }
+
+    public String getDefaultDataverseName() {
+        return defaultDataverse == null ? null : 
defaultDataverse.getDataverseName();
+    }
+
+    public void setWriteTransaction(boolean writeTransaction) {
+        this.isWriteTransaction = writeTransaction;
+    }
+
+    public void setWriterFactory(IAWriterFactory writerFactory) {
+        this.writerFactory = writerFactory;
+    }
+
+    public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
+        this.mdTxnCtx = mdTxnCtx;
+    }
+
+    public MetadataTransactionContext getMetadataTxnContext() {
+        return mdTxnCtx;
+    }
+
+    public IAWriterFactory getWriterFactory() {
+        return this.writerFactory;
+    }
+
+    public FileSplit getOutputFile() {
+        return outputFile;
+    }
+
+    public void setOutputFile(FileSplit outputFile) {
+        this.outputFile = outputFile;
+    }
+
+    public boolean getResultAsyncMode() {
+        return asyncResults;
+    }
+
+    public void setResultAsyncMode(boolean asyncResults) {
+        this.asyncResults = asyncResults;
+    }
+
+    public ResultSetId getResultSetId() {
+        return resultSetId;
+    }
+
+    public void setResultSetId(ResultSetId resultSetId) {
+        this.resultSetId = resultSetId;
+    }
+
+    public void 
setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
+        this.resultSerializerFactoryProvider = rafp;
+    }
+
+    public IResultSerializerFactoryProvider 
getResultSerializerFactoryProvider() {
+        return resultSerializerFactoryProvider;
+    }
+
+    public boolean isWriteTransaction() {
+        // The transaction writes persistent datasets.
+        return isWriteTransaction;
+    }
+
+    public boolean isTemporaryDatasetWriteJob() {
+        // The transaction only writes temporary datasets.
+        return isTemporaryDatasetWriteJob;
+    }
+
+    public IDataFormat getFormat() {
+        return FormatUtils.getDefaultFormat();
+    }
+
+    public AsterixStorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    public Map<String, Integer> getLocks() {
+        return locks;
+    }
+
+    public void setLocks(Map<String, Integer> locks) {
+        this.locks = locks;
+    }
+
+    /**
+     * Retrieve the Output RecordType, as defined by "set output-record-type".
+     */
+    public ARecordType findOutputRecordType() throws AlgebricksException {
+        return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, 
getDefaultDataverseName(),
+                getPropertyValue("output-record-type"));
+    }
+
+    public Dataset findDataset(String dataverse, String dataset) throws 
AlgebricksException {
+        String dv = dataverse == null ? (defaultDataverse == null ? null : 
defaultDataverse.getDataverseName())
+                : dataverse;
+        if (dv == null) {
+            return null;
+        }
+        return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
+    }
+
+    public INodeDomain findNodeDomain(String nodeGroupName) throws 
AlgebricksException {
+        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+    }
+
+    public IAType findType(String dataverse, String typeName) throws 
AlgebricksException {
+        return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
+    }
+
+    public Feed findFeed(String dataverse, String feedName) throws 
AlgebricksException {
+        return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
+    }
+
+    public FeedPolicyEntity findFeedPolicy(String dataverse, String 
policyName) throws AlgebricksException {
+        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, 
policyName);
+    }
+
+    @Override
+    public DataSource findDataSource(DataSourceId id) throws 
AlgebricksException {
+        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+    }
+
+    public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws 
AlgebricksException {
+        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+    }
+
+    @Override
+    public IDataSourceIndex<String, DataSourceId> findDataSourceIndex(String 
indexId, DataSourceId dataSourceId)
+            throws AlgebricksException {
+        DataSource source = findDataSource(dataSourceId);
+        Dataset dataset = ((DatasetDataSource) source).getDataset();
+        try {
+            String indexName = indexId;
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+            if (secondaryIndex != null) {
+                return new DataSourceIndex(secondaryIndex, 
dataset.getDataverseName(), dataset.getDatasetName(), this);
+            } else {
+                Index primaryIndex = 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), dataset.getDatasetName());
+                if (primaryIndex.getIndexName().equals(indexId)) {
+                    return new DataSourceIndex(primaryIndex, 
dataset.getDataverseName(), dataset.getDatasetName(),
+                            this);
+                } else {
+                    return null;
+                }
+            }
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public List<Index> getDatasetIndexes(String dataverseName, String 
datasetName) throws AlgebricksException {
+        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
+    }
+
+    @Override
+    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+        return AsterixBuiltinFunctions.lookupFunction(fid);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getScannerRuntime(
+            IDataSource<DataSourceId> dataSource, List<LogicalVariable> 
scanVariables,
+            List<LogicalVariable> projectVariables, boolean projectPushed, 
List<LogicalVariable> minFilterVars,
+            List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
+            JobGenContext context, JobSpecification jobSpec, Object 
implConfig) throws AlgebricksException {
+        try {
+            return ((DataSource) dataSource).buildDatasourceScanRuntime(this, 
dataSource, scanVariables,
+                    projectVariables, projectPushed, minFilterVars, 
maxFilterVars, opSchema, typeEnv, context, jobSpec,
+                    implConfig);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static AlgebricksAbsolutePartitionConstraint 
determineLocationConstraint(FeedDataSource feedDataSource) {
+        return new 
AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
+    }
+
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildLoadableDatasetScan(
+            JobSpecification jobSpec, IAdapterFactory adapterFactory, 
RecordDescriptor rDesc)
+            throws AlgebricksException {
+        ExternalDataScanOperatorDescriptor dataScanner = new 
ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
+                adapterFactory);
+        AlgebricksPartitionConstraint constraint;
+        try {
+            constraint = adapterFactory.getPartitionConstraint();
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        return new Pair<>(dataScanner, constraint);
+    }
+
+    public IDataFormat getDataFormat(String dataverseName) throws 
AsterixException {
+        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverseName);
+        IDataFormat format;
+        try {
+            format = (IDataFormat) 
Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        return format;
+    }
+
+    public Dataverse findDataverse(String dataverseName) throws 
AsterixException {
+        return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+    }
+
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
IAdapterFactory> buildFeedIntakeRuntime(
+            JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor 
policyAccessor) throws Exception {
+        Triple<IAdapterFactory, RecordDescriptor, 
IDataSourceAdapter.AdapterType> factoryOutput;
+        factoryOutput = 
FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, 
mdTxnCtx,
+                libraryManager);
+        ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, 
primaryFeed.getAdapterConfiguration(),
+                ExternalDataConstants.KEY_TYPE_NAME);
+        IAdapterFactory adapterFactory = factoryOutput.first;
+        FeedIntakeOperatorDescriptor feedIngestor = null;
+        switch (factoryOutput.third) {
+            case INTERNAL:
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, adapterFactory, recordType,
+                        policyAccessor, factoryOutput.second);
+                break;
+            case EXTERNAL:
+                String libraryName = primaryFeed.getAdapterName().trim()
+                        
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, libraryName,
+                        adapterFactory.getClass().getName(), recordType, 
policyAccessor, factoryOutput.second);
+                break;
+        }
+
+        AlgebricksPartitionConstraint partitionConstraint = 
adapterFactory.getPartitionConstraint();
+        return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildBtreeRuntime(JobSpecification jobSpec,
+            List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
+            JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
+            Object implConfig, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
+        boolean isSecondary = true;
+        int numSecondaryKeys = 0;
+        try {
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            if (primaryIndex != null && (dataset.getDatasetType() != 
DatasetType.EXTERNAL)) {
+                isSecondary = !indexName.equals(primaryIndex.getIndexName());
+            }
+            int numPrimaryKeys = 
DatasetUtils.getPartitioningKeys(dataset).size();
+            RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+            int[] bloomFilterKeyFields;
+            ITypeTraits[] typeTraits;
+            IBinaryComparatorFactory[] comparatorFactories;
+
+            ARecordType itemType = (ARecordType) 
this.findType(dataset.getItemTypeDataverseName(),
+                    dataset.getItemTypeName());
+            ARecordType metaType = null;
+            List<Integer> primaryKeyIndicators = null;
+            if (dataset.hasMetaPart()) {
+                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
+                primaryKeyIndicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+
+            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] btreeFields = null;
+
+            if (isSecondary) {
+                Index secondaryIndex = 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName);
+                numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+                bloomFilterKeyFields = new int[numSecondaryKeys];
+                for (int i = 0; i < numSecondaryKeys; i++) {
+                    bloomFilterKeyFields[i] = i;
+                }
+                Pair<IBinaryComparatorFactory[], ITypeTraits[]> 
comparatorFactoriesAndTypeTraits =
+                        
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
+                                secondaryIndex.getKeyFieldNames(), 
secondaryIndex.getKeyFieldTypes(),
+                                DatasetUtils.getPartitioningKeys(dataset), 
itemType, dataset.getDatasetType(),
+                                dataset.hasMetaPart(), primaryKeyIndicators,
+                                secondaryIndex.getKeyFieldSourceIndicators(),
+                                metaType);
+                comparatorFactories = comparatorFactoriesAndTypeTraits.first;
+                typeTraits = comparatorFactoriesAndTypeTraits.second;
+                if (filterTypeTraits != null) {
+                    filterFields = new int[1];
+                    filterFields[0] = numSecondaryKeys + numPrimaryKeys;
+                    btreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+                    for (int k = 0; k < btreeFields.length; k++) {
+                        btreeFields[k] = k;
+                    }
+                }
+
+            } else {
+                bloomFilterKeyFields = new int[numPrimaryKeys];
+                for (int i = 0; i < numPrimaryKeys; i++) {
+                    bloomFilterKeyFields[i] = i;
+                }
+                // get meta item type
+                ARecordType metaItemType = DatasetUtils.getMetaType(this, 
dataset);
+                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
+                comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, 
metaItemType,
+                        context.getBinaryComparatorFactoryProvider());
+                filterFields = DatasetUtils.createFilterFields(dataset);
+                btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+            }
+
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+            spPc = 
splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), 
dataset.getDatasetName(),
+                    indexName, temp);
+
+            ISearchOperationCallbackFactory searchCallbackFactory;
+            if (isSecondary) {
+                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                        : new SecondaryIndexSearchOperationCallbackFactory();
+            } else {
+                JobId jobId = ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId();
+                int datasetId = dataset.getDatasetId();
+                int[] primaryKeyFields = new int[numPrimaryKeys];
+                for (int i = 0; i < numPrimaryKeys; i++) {
+                    primaryKeyFields[i] = i;
+                }
+
+                ITransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
+
+                /**
+                 * Due to the read-committed isolation level,
+                 * we may acquire very short duration lock(i.e., instant lock) 
for readers.
+                 */
+                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                        : new 
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields,
+                                txnSubsystemProvider, ResourceType.LSM_BTREE);
+            }
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            AsterixRuntimeComponentsProvider rtcProvider = 
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+            BTreeSearchOperatorDescriptor btreeSearchOp;
+            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+                btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
+                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                        spPc.first, typeTraits, comparatorFactories, 
bloomFilterKeyFields, lowKeyFields, highKeyFields,
+                        lowKeyInclusive, highKeyInclusive,
+                        new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                                compactionInfo.first, compactionInfo.second,
+                                isSecondary ? new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+                                        : new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                rtcProvider, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                
storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, 
filterTypeTraits,
+                                filterCmpFactories, btreeFields, filterFields, 
!temp),
+                        retainInput, retainMissing, 
context.getMissingWriterFactory(), searchCallbackFactory,
+                        minFilterFieldIndexes, maxFilterFieldIndexes);
+            } else {
+                // External dataset <- use the btree with buddy btree->
+                // Be Careful of Key Start Index ?
+                int[] buddyBreeFields = new int[] { numSecondaryKeys };
+                ExternalBTreeWithBuddyDataflowHelperFactory 
indexDataflowHelperFactory =
+                        new ExternalBTreeWithBuddyDataflowHelperFactory(
+                                compactionInfo.first, compactionInfo.second,
+                                new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                                
getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                                
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), 
!temp);
+                btreeSearchOp = new 
ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
+                        rtcProvider, spPc.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, lowKeyFields,
+                        highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexDataflowHelperFactory, retainInput,
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory);
+            }
+            return new Pair<>(btreeSearchOp, spPc.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildRtreeRuntime(JobSpecification jobSpec,
+            List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
+            JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
+            int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
+        try {
+            ARecordType recType = (ARecordType) 
findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            int numPrimaryKeys = 
DatasetUtils.getPartitioningKeys(dataset).size();
+
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+            if (secondaryIndex == null) {
+                throw new AlgebricksException(
+                        "Code generation error: no index " + indexName + " for 
dataset " + dataset.getDatasetName());
+            }
+            List<List<String>> secondaryKeyFields = 
secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+            int numSecondaryKeys = secondaryKeyFields.size();
+            if (numSecondaryKeys != 1) {
+                throw new AlgebricksException(
+                        "Cannot use " + numSecondaryKeys + " fields as a key 
for the R-tree index. "
+                                + "There can be only one field as a key for 
the R-tree index.");
+            }
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyFields.get(0), recType);
+            IAType keyType = keyTypePair.first;
+            if (keyType == null) {
+                throw new AlgebricksException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
+            }
+            int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
+            boolean isPointMBR = keyType.getTypeTag() == ATypeTag.POINT || 
keyType.getTypeTag() == ATypeTag.POINT3D;
+            int numNestedSecondaryKeyFields = numDimensions * 2;
+            IPrimitiveValueProviderFactory[] valueProviderFactories =
+                    new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+            for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+                valueProviderFactories[i] = 
AqlPrimitiveValueProviderFactory.INSTANCE;
+            }
+
+            RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+            // IS NOT THE VARIABLE BELOW ALWAYS = 0 ??
+            int keysStartIndex = outputRecDesc.getFieldCount() - 
numNestedSecondaryKeyFields - numPrimaryKeys;
+            if (retainInput) {
+                keysStartIndex -= numNestedSecondaryKeyFields;
+            }
+            IBinaryComparatorFactory[] comparatorFactories = 
JobGenHelper.variablesToAscBinaryComparatorFactories(
+                    outputVars, keysStartIndex, numNestedSecondaryKeyFields, 
typeEnv, context);
+            ITypeTraits[] typeTraits = 
JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
+                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, 
context);
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                    
splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName, temp);
+            ARecordType metaType = null;
+            if (dataset.hasMetaPart()) {
+                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
+            }
+
+            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, metaType, 
context.getBinaryComparatorFactoryProvider());
+            int[] btreeFields = new int[primaryComparatorFactories.length];
+            for (int i = 0; i < btreeFields.length; i++) {
+                btreeFields[i] = i + numNestedSecondaryKeyFields;
+            }
+
+            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] rtreeFields = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys;
+                rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+                for (int i = 0; i < rtreeFields.length; i++) {
+                    rtreeFields[i] = i;
+                }
+            }
+
+            IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
+
+            RTreeSearchOperatorDescriptor rtreeSearchOp;
+            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = 
getMergedComparatorFactories(
+                        comparatorFactories, primaryComparatorFactories);
+                IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
+                        valueProviderFactories, RTreePolicyType.RTREE, 
deletedKeyBTreeCompFactories,
+                        new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                        compactionInfo.second, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                        
MetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), 
comparatorFactories.length),
+                        rtreeFields, filterTypeTraits, filterCmpFactories, 
filterFields, !temp, isPointMBR);
+                rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
+                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                        spPc.first, typeTraits, comparatorFactories, 
keyFields, idff, retainInput, retainMissing,
+                        context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
+                        maxFilterFieldIndexes);
+            } else {
+                // External Dataset
+                ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory 
= new ExternalRTreeDataflowHelperFactory(
+                        valueProviderFactories, RTreePolicyType.RTREE,
+                        IndexingConstants.getBuddyBtreeComparatorFactories(), 
compactionInfo.first,
+                        compactionInfo.second, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                        proposeLinearizer(nestedKeyType.getTypeTag(), 
comparatorFactories.length),
+                        
getStorageProperties().getBloomFilterFalsePositiveRate(),
+                        new int[] { numNestedSecondaryKeyFields },
+                        
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), 
!temp, isPointMBR);
+                // Create the operator
+                rtreeSearchOp = new 
ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
+                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                        spPc.first, typeTraits, comparatorFactories, 
keyFields, indexDataflowHelperFactory, retainInput,
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory);
+            }
+
+            return new Pair<>(rtreeSearchOp, spPc.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    @Override
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc) {
+        FileSplitDataSink fsds = (FileSplitDataSink) sink;
+        FileSplitSinkId fssi = fsds.getId();
+        FileSplit fs = fssi.getFileSplit();
+        File outFile = fs.getLocalFile().getFile();
+        String nodeId = fs.getNodeName();
+
+        SinkWriterRuntimeFactory runtime = new 
SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
+                getWriterFactory(), inputDesc);
+        AlgebricksPartitionConstraint apc = new 
AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
+        return new Pair<>(runtime, apc);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException {
+        ResultSetDataSink rsds = (ResultSetDataSink) sink;
+        ResultSetSinkId rssId = rsds.getId();
+        ResultSetId rsId = rssId.getResultSetId();
+        ResultWriterOperatorDescriptor resultWriter = null;
+        try {
+            IResultSerializerFactory resultSerializedAppenderFactory = 
resultSerializerFactoryProvider
+                    .getAqlResultSerializerFactoryProvider(printColumns, 
printerFactories, getWriterFactory());
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, 
ordered, getResultAsyncMode(),
+                    resultSerializedAppenderFactory);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+        return new Pair<>(resultWriter, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getWriteResultRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, List<LogicalVariable> 
additionalNonKeyFields, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        String dataverseName = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
+        int numKeys = keys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+
+        // move key fields to front
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        if (numFilterFields > 0) {
+            int idx = 
propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+            fieldPermutation[numKeys + 1] = idx;
+        }
+
+        try {
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
+
+            ARecordType metaType = null;
+            if (dataset.hasMetaPart()) {
+                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
+            }
+
+            String itemTypeName = dataset.getItemTypeName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
+            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, metaType, 
context.getBinaryComparatorFactoryProvider());
+
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
+                            datasetName, indexName, temp);
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+
+            long numElementsHint = getCardinalityPerPartitionHint(dataset);
+
+            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+            // TODO
+            // figure out the right behavior of the bulkload and then give the
+            // right callback
+            // (ex. what's the expected behavior when there is an error during
+            // bulkload?)
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
TreeIndexBulkLoadOperatorDescriptor(spec, null,
+                    appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, true,
+                    new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            compactionInfo.first, compactionInfo.second,
+                            new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                            
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+                            filterCmpFactories, btreeFields, filterFields, 
!temp));
+            return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> keys, LogicalVariable payload, 
List<LogicalVariable> additionalNonKeyFields,
+            List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, boolean bulkload) throws 
AlgebricksException {
+        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, 
propagatedSchema, keys, payload,
+                additionalNonKeyFields, recordDesc, context, spec, bulkload, 
additionalNonFilteringFields);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getDeleteRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> keys, LogicalVariable payload, 
List<LogicalVariable> additionalNonKeyFields,
+            RecordDescriptor recordDesc, JobGenContext context, 
JobSpecification spec) throws AlgebricksException {
+        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, 
propagatedSchema, keys, payload,
+                additionalNonKeyFields, recordDesc, context, spec, false, 
null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexInsertRuntime(
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec,
+            boolean bulkload) throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, 
dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, bulkload, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexDeleteRuntime(
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, 
dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, false, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexUpsertRuntime(
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> 
prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor 
recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, 
dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalFilteringKeys, filterExpr, recordDesc,
+                context, spec, false, prevSecondaryKeys, 
prevAdditionalFilteringKey);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getTokenizerRuntime(
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression 
filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload) 
throws AlgebricksException {
+
+        String indexName = dataSourceIndex.getId();
+        String dataverseName = 
dataSourceIndex.getDataSource().getId().getDataverseName();
+        String datasetName = 
dataSourceIndex.getDataSource().getId().getDatasourceName();
+
+        IOperatorSchema inputSchema;
+        if (inputSchemas.length > 0) {
+            inputSchema = inputSchemas[0];
+        } else {
+            throw new AlgebricksException("TokenizeOperator can not operate 
without any input variable.");
+        }
+
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
+        Index secondaryIndex;
+        try {
+            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+        // TokenizeOperator only supports a keyword or n-gram index.
+        switch (secondaryIndex.getIndexType()) {
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                return getBinaryTokenizerRuntime(dataverseName, datasetName, 
indexName, inputSchema, propagatedSchema,
+                        primaryKeys, secondaryKeys, recordDesc, spec, 
secondaryIndex.getIndexType());
+            default:
+                throw new AlgebricksException("Currently, we do not support 
TokenizeOperator for the index type: "
+                        + secondaryIndex.getIndexType());
+        }
+    }
+
+    /**
+     * Calculate an estimate size of the bloom filter. Note that this is an
+     * estimation which assumes that the data is going to be uniformly
+     * distributed across all partitions.
+     *
+     * @param dataset
+     * @return Number of elements that will be used to create a bloom filter 
per
+     *         dataset per partition
+     * @throws MetadataException
+     * @throws AlgebricksException
+     */
+    public long getCardinalityPerPartitionHint(Dataset dataset) throws 
MetadataException, AlgebricksException {
+        String numElementsHintString = 
dataset.getHints().get(DatasetCardinalityHint.NAME);
+        long numElementsHint;
+        if (numElementsHintString == null) {
+            numElementsHint = DatasetCardinalityHint.DEFAULT;
+        } else {
+            numElementsHint = Long.parseLong(numElementsHintString);
+        }
+        int numPartitions = 0;
+        List<String> nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                .getNodeNames();
+        for (String nd : nodeGroup) {
+            numPartitions += 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
+        }
+        numElementsHint = numElementsHint / numPartitions;
+        return numElementsHint;
+    }
+
+    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, 
String adapterName,
+            Map<String, String> configuration, ARecordType itemType, boolean 
isPKAutoGenerated,
+            List<List<String>> primaryKeys, ARecordType metaType) throws 
AlgebricksException {
+        try {
+            configuration.put(ExternalDataConstants.KEY_DATAVERSE, 
dataset.getDataverseName());
+            IAdapterFactory adapterFactory = 
AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
+                    configuration, itemType, metaType);
+
+            // check to see if dataset is indexed
+            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+
+            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
+                // get files
+                List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+                Iterator<ExternalFile> iterator = files.iterator();
+                while (iterator.hasNext()) {
+                    if (iterator.next().getPendingOp() != 
ExternalFilePendingOp.PENDING_NO_OP) {
+                        iterator.remove();
+                    }
+                }
+            }
+
+            return adapterFactory;
+        } catch (Exception e) {
+            throw new AlgebricksException("Unable to create adapter", e);
+        }
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag 
keyType, int numKeyFields)
+            throws AlgebricksException {
+        return 
AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType,
 true,
+                numKeyFields / 2);
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForDataset(
+            String dataverseName, String datasetName, String targetIdxName, 
boolean temp) throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, 
datasetName, targetIdxName, temp);
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        return 
SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
+    }
+
+    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String datasetName,
+            String targetIdxName, boolean temp) throws AlgebricksException {
+        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, 
dataverseName, datasetName, targetIdxName, temp);
+    }
+
+    public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
+            throws MetadataException {
+        DatasourceAdapter adapter;
+        // search in default namespace (built-in adapter)
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+
+        // search in dataverse (user-defined adapter)
+        if (adapter == null) {
+            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
dataverseName, adapterName);
+        }
+        return adapter;
+    }
+
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        return ClusterStateManager.INSTANCE.getClusterLocations();
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForFilesIndex(
+            String dataverseName, String datasetName, String targetIdxName, 
boolean create) throws AlgebricksException {
+        return 
SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx,
 dataverseName,
+                datasetName, targetIdxName, create);
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(
+            JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, 
int[] ridIndexes, boolean retainInput,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> 
outputVars, IOperatorSchema opSchema,
+            JobGenContext context, MetadataProvider metadataProvider, boolean 
retainMissing)
+            throws AlgebricksException {
+        try {
+            // Get data type
+            IAType itemType;
+            itemType = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                    dataset.getDataverseName(), 
dataset.getItemTypeName()).getDatatype();
+
+            // Create the adapter factory <- right now there is only one. if 
there are more in the future, we can create
+            // a map->
+            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+            LookupAdapterFactory<?> adapterFactory = 
AdapterFactoryProvider.getLookupAdapterFactory(libraryManager,
+                    datasetDetails.getProperties(), (ARecordType) itemType, 
ridIndexes, retainInput, retainMissing,
+                    context.getMissingWriterFactory());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+            try {
+                compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+            } catch (MetadataException e) {
+                throw new AlgebricksException(" Unabel to create merge policy 
factory for external dataset", e);
+            }
+
+            boolean temp = datasetDetails.isTemp();
+            // Create the file index data flow helper
+            ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = 
new ExternalBTreeDataflowHelperFactory(
+                    compactionInfo.first, compactionInfo.second,
+                    new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    
metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+                    
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, 
metadataProvider), !temp);
+
+            // Create the out record descriptor, appContext and 
fileSplitProvider for the files index
+            RecordDescriptor outRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+            spPc = 
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX),
 false);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
+            // Create the operator
+            ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
+                    outRecDesc, indexDataflowHelperFactory, retainInput, 
appContext.getIndexLifecycleManagerProvider(),
+                    appContext.getStorageManagerInterface(), spPc.first, 
dataset.getDatasetId(),
+                    
metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), 
searchOpCallbackFactory,
+                    retainMissing, context.getMissingWriterFactory());
+            return new Pair<>(op, spPc.second);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getUpsertRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, 
IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, LogicalVariable payload, 
List<LogicalVariable> filterKeys,
+            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor 
recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), 
datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException(
+                    "Unknown dataset " + datasetName + " in dataverse " + 
dataSource.getId().getDataverseName());
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = primaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : 
additionalNonFilterFields.size();
+        // Move key fields to front. [keys, record, filters]
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + 
numOfAdditionalFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        // set the keys' permutations
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = inputSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        // set the record permutation
+        fieldPermutation[i++] = inputSchema.findVariable(payload);
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = inputSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
+        }
+
+        if (additionalNonFilterFields != null) {
+            for (LogicalVariable var : additionalNonFilterFields) {
+                int idx = inputSchema.findVariable(var);
+                fieldPermutation[i++] = idx;
+            }
+        }
+
+        try {
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
+
+            String itemTypeName = dataset.getItemTypeName();
+            String itemTypeDataverseName = dataset.getItemTypeDataverseName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, itemTypeDataverseName, 
itemTypeName).getDatatype();
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
 datasetName,
+                            indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+            TransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
+                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, 
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+                    : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, 
dataset.hasMetaPart());
+
+            LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
+                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new 
LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), 
compactionInfo.first, compactionInfo.second,
+                    new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), true, 
filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            AsterixLSMTreeUpsertOperatorDescriptor op;
+
+            ITypeTraits[] outputTypeTraits = new 
ITypeTraits[recordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ISerializerDeserializer[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+            // add the previous record first
+            int f = 0;
+            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            f++;
+            // add the previous meta second
+            if (dataset.hasMetaPart()) {
+                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(
+                        metaItemType);
+                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                f++;
+            }
+            // add the previous filter third
+            int fieldIdx = -1;
+            if (numFilterFields > 0) {
+                String filterField = 
DatasetUtils.getFilterField(dataset).get(0);
+                for (i = 0; i < itemType.getFieldNames().length; i++) {
+                    if (itemType.getFieldNames()[i].equals(filterField)) {
+                        break;
+                    }
+                }
+                fieldIdx = i;
+                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType
+                        .getFieldTypes()[fieldIdx]);
+                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
+                        
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                f++;
+            }
+            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
+                outputSerDes[j + f] = recordDesc.getFields()[j];
+            }
+
+            RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
+            op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, 
outputRecordDesc,
+                    appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                    idfh, null, true, indexName, 
context.getMissingWriterFactory(), modificationCallbackFactory,
+                    searchCallbackFactory, null);
+            op.setType(itemType);
+            op.setFilterIndex(fieldIdx);
+            return new Pair<>(op, splitsAndConstraint.second);
+
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDatasetDataScannerRuntime(
+            JobSpecification jobSpec, IAType itemType, IAdapterFactory 
adapterFactory, IDataFormat format)
+            throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only scan datasets of 
records.");
+        }
+
+        ISerializerDeserializer<?> payloadSerde = 
format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new 
ISerializerDeserializer[] { payloadSerde });
+
+        ExternalDataScanOperatorDescriptor dataScanner = new 
ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory);
+
+        AlgebricksPartitionConstraint constraint;
+        try {
+            constraint = adapterFactory.getPartitionConstraint();
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+
+        return new Pair<>(dataScanner, constraint);
+    }
+
+    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> 
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
+            List<List<String>> sidxKeyFieldNames, List<IAType> 
sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames,
+            ARecordType recType, DatasetType dsType, boolean hasMeta, 
List<Integer> primaryIndexKeyIndicators,
+            List<Integer> secondaryIndexIndicators, ARecordType metaType) 
throws AlgebricksException {
+
+        IBinaryComparatorFactory[] comparatorFactories;
+        ITypeTraits[] typeTraits;
+        int sidxKeyFieldCount = sidxKeyFieldNames.size();
+        int pidxKeyFieldCount = pidxKeyFieldNames.size();
+        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
+        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + 
pidxKeyFieldCount];
+
+        int i = 0;
+        for (; i < sidxKeyFieldCount; ++i) {
+            Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
+                    sidxKeyFieldNames.get(i),
+                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 
1) ? metaType : recType);
+            IAType keyType = keyPairType.first;
+            comparatorFactories[i] = 
AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
+            IAType keyType = null;
+            try {
+                switch (dsType) {
+                    case INTERNAL:
+                        keyType = (hasMeta && 
primaryIndexKeyIndicators.get(j).intValue() == 1)
+                                ? 
metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+                                : 
recType.getSubFieldType(pidxKeyFieldNames.get(j));
+                        break;
+                    case EXTERNAL:
+                        keyType = IndexingConstants.getFieldType(j);
+                        break;
+                    default:
+                        throw new AlgebricksException("Unknown Dataset Type");
+                }
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
+            }
+            comparatorFactories[i] = 
AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        return new Pair<>(comparatorFactories, typeTraits);
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertOrDeleteRuntime(IndexOperation indexOp,
+            IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, List<LogicalVariable> 
additionalNonKeyFields, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload,
+            List<LogicalVariable> additionalNonFilteringFields) throws 
AlgebricksException {
+
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataSource.getId().getDataverseName(),
+                datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = keys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        // Move key fields to front.
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
+                + (additionalNonFilteringFields == null ? 0 : 
additionalNonFilteringFields.size())];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+        if (numFilterFields > 0) {
+            int idx = 
propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+            fieldPermutation[i++] = idx;
+        }
+        if (additionalNonFilteringFields != null) {
+            for (LogicalVariable variable : additionalNonFilteringFields) {
+                int idx = propagatedSchema.findVariable(variable);
+                fieldPermutation[i++] = idx;
+            }
+        }
+
+        try {
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName()).getDatatype();
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+
+            IAsterixApplicationContextInfo appContext = 
(IAsterixApplicationContextInfo) context.getAppContext();
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
 datasetName,
+                            indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+            TransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
+                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE)
+                    : new 
PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields,
+                            txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE, dataset.hasMetaPart());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new 
LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), 
compactionInfo.first, compactionInfo.second,
+                    new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), true, 
filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            IOperatorDescriptor op;
+            if (bulkload) {
+                long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManagerInterface(),
+                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh);
+            } else {
+                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc,
+                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields,
+                        fieldPermutation, indexOp, idfh, null, true, 
indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
+            }
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexInsertOrDeleteOrUpsertRuntime(
+            IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> 
dataSourceIndex,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression 
filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey) throws 
AlgebricksException {
+        String indexName = dataSourceIndex.getId();
+        String dataverseName = 
dataSourceIndex.getDataSource().getId().getDataverseName();
+        String datasetName = 
dataSourceIndex.getDataSource().getId().getDatasourceName();
+
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
+        Index secondaryIndex;
+        try {
+            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+
+        ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
+        if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != 
null) {
+            prevAdditionalFilteringKeys = new ArrayList<>();
+            prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
+        }
+        AsterixTupleFilterFactory filterFactory = 
createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+        switch (secondaryIndex.getIndexType()) {
+            case BTREE:
+                return getBTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
+            case RTREE:
+                return getRTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        bulkload, prevSecondaryKeys, 
prevAdditionalFilteringKeys);
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                return getInvertedIndexRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, 
recordDesc, context, spec, indexOp,
+                        secondaryIndex.getIndexType(), bulkload, 
prevSecondaryKeys, prevAdditionalFilteringKeys);
+            default:
+                throw new AlgebricksException(
+                        indexOp.name() + "Insert, upsert, and delete not 
implemented for index type: "
+                                + secondaryIndex.getIndexType());
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBTreeRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
List<LogicalVariable> prevSecondaryKeys,
+            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+
+        // generate field permutations
+        int[] fieldPermutation = new int[numKeys + numFilterFields];
+        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+        int[] modificationCallbackPrimaryKeyFields = new 
int[primaryKeys.size()];
+        int i = 0;
+        int j = 0;
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+  

<TRUNCATED>

Reply via email to