http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42600592/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java new file mode 100644 index 0000000..7258be8 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -0,0 +1,2256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.declared; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.AsterixStorageProperties; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.ITransactionSubsystemProvider; +import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor; +import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor; +import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory; +import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; +import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor; +import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; +import org.apache.asterix.external.provider.AdapterFactoryProvider; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.FeedConstants; +import org.apache.asterix.formats.base.IDataFormat; +import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider; +import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.DatasourceAdapter; +import org.apache.asterix.metadata.entities.Dataverse; +import org.apache.asterix.metadata.entities.ExternalDatasetDetails; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedPolicyEntity; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.feeds.FeedMetadataUtil; +import org.apache.asterix.metadata.utils.DatasetUtils; +import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; +import org.apache.asterix.metadata.utils.MetadataConstants; +import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.util.NonTaggedFormatUtil; +import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; +import org.apache.asterix.runtime.formats.FormatUtils; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor; +import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor; +import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider; +import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IAWriterFactory; +import org.apache.hyracks.algebricks.data.IPrinterFactory; +import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; +import org.apache.hyracks.data.std.primitive.ShortPointable; +import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; +import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory; +import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; + +public class MetadataProvider implements IMetadataProvider<DataSourceId, String> { + + private final AsterixStorageProperties storageProperties; + private final ILibraryManager libraryManager; + private final Dataverse defaultDataverse; + + private MetadataTransactionContext mdTxnCtx; + private boolean isWriteTransaction; + private Map<String, String> config; + private IAWriterFactory writerFactory; + private FileSplit outputFile; + private boolean asyncResults; + private ResultSetId resultSetId; + private IResultSerializerFactoryProvider resultSerializerFactoryProvider; + private JobId jobId; + private Map<String, Integer> locks; + private boolean isTemporaryDatasetWriteJob = true; + + public MetadataProvider(Dataverse defaultDataverse) { + this.defaultDataverse = defaultDataverse; + this.storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties(); + this.libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager(); + } + + public String getPropertyValue(String propertyName) { + return config.get(propertyName); + } + + public void setConfig(Map<String, String> config) { + this.config = config; + } + + public Map<String, String> getConfig() { + return config; + } + + public ILibraryManager getLibraryManager() { + return libraryManager; + } + + public void setJobId(JobId jobId) { + this.jobId = jobId; + } + + public Dataverse getDefaultDataverse() { + return defaultDataverse; + } + + public String getDefaultDataverseName() { + return defaultDataverse == null ? null : defaultDataverse.getDataverseName(); + } + + public void setWriteTransaction(boolean writeTransaction) { + this.isWriteTransaction = writeTransaction; + } + + public void setWriterFactory(IAWriterFactory writerFactory) { + this.writerFactory = writerFactory; + } + + public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) { + this.mdTxnCtx = mdTxnCtx; + } + + public MetadataTransactionContext getMetadataTxnContext() { + return mdTxnCtx; + } + + public IAWriterFactory getWriterFactory() { + return this.writerFactory; + } + + public FileSplit getOutputFile() { + return outputFile; + } + + public void setOutputFile(FileSplit outputFile) { + this.outputFile = outputFile; + } + + public boolean getResultAsyncMode() { + return asyncResults; + } + + public void setResultAsyncMode(boolean asyncResults) { + this.asyncResults = asyncResults; + } + + public ResultSetId getResultSetId() { + return resultSetId; + } + + public void setResultSetId(ResultSetId resultSetId) { + this.resultSetId = resultSetId; + } + + public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) { + this.resultSerializerFactoryProvider = rafp; + } + + public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() { + return resultSerializerFactoryProvider; + } + + public boolean isWriteTransaction() { + // The transaction writes persistent datasets. + return isWriteTransaction; + } + + public boolean isTemporaryDatasetWriteJob() { + // The transaction only writes temporary datasets. + return isTemporaryDatasetWriteJob; + } + + public IDataFormat getFormat() { + return FormatUtils.getDefaultFormat(); + } + + public AsterixStorageProperties getStorageProperties() { + return storageProperties; + } + + public Map<String, Integer> getLocks() { + return locks; + } + + public void setLocks(Map<String, Integer> locks) { + this.locks = locks; + } + + /** + * Retrieve the Output RecordType, as defined by "set output-record-type". + */ + public ARecordType findOutputRecordType() throws AlgebricksException { + return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(), + getPropertyValue("output-record-type")); + } + + public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException { + String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) + : dataverse; + if (dv == null) { + return null; + } + return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset); + } + + public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException { + return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName); + } + + public IAType findType(String dataverse, String typeName) throws AlgebricksException { + return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName); + } + + public Feed findFeed(String dataverse, String feedName) throws AlgebricksException { + return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName); + } + + public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException { + return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName); + } + + @Override + public DataSource findDataSource(DataSourceId id) throws AlgebricksException { + return MetadataManagerUtil.findDataSource(mdTxnCtx, id); + } + + public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws AlgebricksException { + return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId); + } + + @Override + public IDataSourceIndex<String, DataSourceId> findDataSourceIndex(String indexId, DataSourceId dataSourceId) + throws AlgebricksException { + DataSource source = findDataSource(dataSourceId); + Dataset dataset = ((DatasetDataSource) source).getDataset(); + try { + String indexName = indexId; + Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName); + if (secondaryIndex != null) { + return new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this); + } else { + Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName()); + if (primaryIndex.getIndexName().equals(indexId)) { + return new DataSourceIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), + this); + } else { + return null; + } + } + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException { + return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); + } + + @Override + public IFunctionInfo lookupFunction(FunctionIdentifier fid) { + return AsterixBuiltinFunctions.lookupFunction(fid); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime( + IDataSource<DataSourceId> dataSource, List<LogicalVariable> scanVariables, + List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, + List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, + JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException { + try { + return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, + projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, + implConfig); + } catch (AsterixException e) { + throw new AlgebricksException(e); + } + } + + public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) { + return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations()); + } + + protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan( + JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc) + throws AlgebricksException { + ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc, + adapterFactory); + AlgebricksPartitionConstraint constraint; + try { + constraint = adapterFactory.getPartitionConstraint(); + } catch (Exception e) { + throw new AlgebricksException(e); + } + return new Pair<>(dataScanner, constraint); + } + + public IDataFormat getDataFormat(String dataverseName) throws AsterixException { + Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); + IDataFormat format; + try { + format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); + } catch (Exception e) { + throw new AsterixException(e); + } + return format; + } + + public Dataverse findDataverse(String dataverseName) throws AsterixException { + return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); + } + + public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime( + JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception { + Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput; + factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, + libraryManager); + ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(), + ExternalDataConstants.KEY_TYPE_NAME); + IAdapterFactory adapterFactory = factoryOutput.first; + FeedIntakeOperatorDescriptor feedIngestor = null; + switch (factoryOutput.third) { + case INTERNAL: + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, recordType, + policyAccessor, factoryOutput.second); + break; + case EXTERNAL: + String libraryName = primaryFeed.getAdapterName().trim() + .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, + adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second); + break; + } + + AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint(); + return new Triple<>(feedIngestor, partitionConstraint, adapterFactory); + } + + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec, + List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, + JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, + int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { + boolean isSecondary = true; + int numSecondaryKeys = 0; + try { + boolean temp = dataset.getDatasetDetails().isTemp(); + Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName()); + if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) { + isSecondary = !indexName.equals(primaryIndex.getIndexName()); + } + int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); + RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); + int[] bloomFilterKeyFields; + ITypeTraits[] typeTraits; + IBinaryComparatorFactory[] comparatorFactories; + + ARecordType itemType = (ARecordType) this.findType(dataset.getItemTypeDataverseName(), + dataset.getItemTypeName()); + ARecordType metaType = null; + List<Integer> primaryKeyIndicators = null; + if (dataset.hasMetaPart()) { + metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), + dataset.getMetaItemTypeName()); + primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); + } + + ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, + itemType, context.getBinaryComparatorFactoryProvider()); + int[] filterFields = null; + int[] btreeFields = null; + + if (isSecondary) { + Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName); + numSecondaryKeys = secondaryIndex.getKeyFieldNames().size(); + bloomFilterKeyFields = new int[numSecondaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + bloomFilterKeyFields[i] = i; + } + Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits = + getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex( + secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(), + DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(), + dataset.hasMetaPart(), primaryKeyIndicators, + secondaryIndex.getKeyFieldSourceIndicators(), + metaType); + comparatorFactories = comparatorFactoriesAndTypeTraits.first; + typeTraits = comparatorFactoriesAndTypeTraits.second; + if (filterTypeTraits != null) { + filterFields = new int[1]; + filterFields[0] = numSecondaryKeys + numPrimaryKeys; + btreeFields = new int[numSecondaryKeys + numPrimaryKeys]; + for (int k = 0; k < btreeFields.length; k++) { + btreeFields[k] = k; + } + } + + } else { + bloomFilterKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < numPrimaryKeys; i++) { + bloomFilterKeyFields[i] = i; + } + // get meta item type + ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); + typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); + comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, + context.getBinaryComparatorFactoryProvider()); + filterFields = DatasetUtils.createFilterFields(dataset); + btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); + } + + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; + spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(), + indexName, temp); + + ISearchOperationCallbackFactory searchCallbackFactory; + if (isSecondary) { + searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new SecondaryIndexSearchOperationCallbackFactory(); + } else { + JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(); + int datasetId = dataset.getDatasetId(); + int[] primaryKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < numPrimaryKeys; i++) { + primaryKeyFields[i] = i; + } + + ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); + + /** + * Due to the read-committed isolation level, + * we may acquire very short duration lock(i.e., instant lock) for readers. + */ + searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields, + txnSubsystemProvider, ResourceType.LSM_BTREE); + } + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils + .getMergePolicyFactory(dataset, mdTxnCtx); + AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER; + BTreeSearchOperatorDescriptor btreeSearchOp; + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, + lowKeyInclusive, highKeyInclusive, + new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), + compactionInfo.first, compactionInfo.second, + isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()) + : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), + rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits, + filterCmpFactories, btreeFields, filterFields, !temp), + retainInput, retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, + minFilterFieldIndexes, maxFilterFieldIndexes); + } else { + // External dataset <- use the btree with buddy btree-> + // Be Careful of Key Start Index ? + int[] buddyBreeFields = new int[] { numSecondaryKeys }; + ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = + new ExternalBTreeWithBuddyDataflowHelperFactory( + compactionInfo.first, compactionInfo.second, + new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, + LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, + getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp); + btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider, + rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, + highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory); + } + return new Pair<>(btreeSearchOp, spPc.second); + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec, + List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, + JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, + int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { + try { + ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); + + boolean temp = dataset.getDatasetDetails().isTemp(); + Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName); + if (secondaryIndex == null) { + throw new AlgebricksException( + "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName()); + } + List<List<String>> secondaryKeyFields = secondaryIndex.getKeyFieldNames(); + List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); + int numSecondaryKeys = secondaryKeyFields.size(); + if (numSecondaryKeys != 1) { + throw new AlgebricksException( + "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " + + "There can be only one field as a key for the R-tree index."); + } + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), + secondaryKeyFields.get(0), recType); + IAType keyType = keyTypePair.first; + if (keyType == null) { + throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); + } + int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag()); + boolean isPointMBR = keyType.getTypeTag() == ATypeTag.POINT || keyType.getTypeTag() == ATypeTag.POINT3D; + int numNestedSecondaryKeyFields = numDimensions * 2; + IPrimitiveValueProviderFactory[] valueProviderFactories = + new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE; + } + + RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); + // IS NOT THE VARIABLE BELOW ALWAYS = 0 ?? + int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys; + if (retainInput) { + keysStartIndex -= numNestedSecondaryKeyFields; + } + IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( + outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context); + ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, + numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context); + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = + splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), + dataset.getDatasetName(), indexName, temp); + ARecordType metaType = null; + if (dataset.hasMetaPart()) { + metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), + dataset.getMetaItemTypeName()); + } + + IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories( + dataset, recType, metaType, context.getBinaryComparatorFactoryProvider()); + int[] btreeFields = new int[primaryComparatorFactories.length]; + for (int i = 0; i < btreeFields.length; i++) { + btreeFields[i] = i + numNestedSecondaryKeyFields; + } + + ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, + recType, context.getBinaryComparatorFactoryProvider()); + int[] filterFields = null; + int[] rtreeFields = null; + if (filterTypeTraits != null) { + filterFields = new int[1]; + filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys; + rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; + for (int i = 0; i < rtreeFields.length; i++) { + rtreeFields[i] = i; + } + } + + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag()); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils + .getMergePolicyFactory(dataset, mdTxnCtx); + ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new SecondaryIndexSearchOperationCallbackFactory(); + + RTreeSearchOperatorDescriptor rtreeSearchOp; + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories( + comparatorFactories, primaryComparatorFactories); + IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory( + valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories, + new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first, + compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, + MetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length), + rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR); + rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + spPc.first, typeTraits, comparatorFactories, keyFields, idff, retainInput, retainMissing, + context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes); + } else { + // External Dataset + ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory( + valueProviderFactories, RTreePolicyType.RTREE, + IndexingConstants.getBuddyBtreeComparatorFactories(), compactionInfo.first, + compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, + proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length), + getStorageProperties().getBloomFilterFalsePositiveRate(), + new int[] { numNestedSecondaryKeyFields }, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp, isPointMBR); + // Create the operator + rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory); + } + + return new Pair<>(rtreeSearchOp, spPc.second); + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + @Override + public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, + int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) { + FileSplitDataSink fsds = (FileSplitDataSink) sink; + FileSplitSinkId fssi = fsds.getId(); + FileSplit fs = fssi.getFileSplit(); + File outFile = fs.getLocalFile().getFile(); + String nodeId = fs.getNodeName(); + + SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, + getWriterFactory(), inputDesc); + AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId }); + return new Pair<>(runtime, apc); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, + int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered, + JobSpecification spec) throws AlgebricksException { + ResultSetDataSink rsds = (ResultSetDataSink) sink; + ResultSetSinkId rssId = rsds.getId(); + ResultSetId rsId = rssId.getResultSetId(); + ResultWriterOperatorDescriptor resultWriter = null; + try { + IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider + .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory()); + resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(), + resultSerializedAppenderFactory); + } catch (IOException e) { + throw new AlgebricksException(e); + } + return new Pair<>(resultWriter, null); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime( + IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys, + LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context, + JobSpecification spec) throws AlgebricksException { + String dataverseName = dataSource.getId().getDataverseName(); + String datasetName = dataSource.getId().getDatasourceName(); + + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); + int numKeys = keys.size(); + int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + + // move key fields to front + int[] fieldPermutation = new int[numKeys + 1 + numFilterFields]; + int[] bloomFilterKeyFields = new int[numKeys]; + int i = 0; + for (LogicalVariable varKey : keys) { + int idx = propagatedSchema.findVariable(varKey); + fieldPermutation[i] = idx; + bloomFilterKeyFields[i] = i; + i++; + } + fieldPermutation[numKeys] = propagatedSchema.findVariable(payload); + if (numFilterFields > 0) { + int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); + fieldPermutation[numKeys + 1] = idx; + } + + try { + boolean temp = dataset.getDatasetDetails().isTemp(); + isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; + + Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName()); + String indexName = primaryIndex.getIndexName(); + + ARecordType metaType = null; + if (dataset.hasMetaPart()) { + metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), + dataset.getMetaItemTypeName()); + } + + String itemTypeName = dataset.getItemTypeName(); + ARecordType itemType = (ARecordType) MetadataManager.INSTANCE + .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); + ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + itemType, metaType, context.getBinaryComparatorFactoryProvider()); + + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), + datasetName, indexName, temp); + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + + long numElementsHint = getCardinalityPerPartitionHint(dataset); + + ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, + itemType, context.getBinaryComparatorFactoryProvider()); + int[] filterFields = DatasetUtils.createFilterFields(dataset); + int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); + + // TODO + // figure out the right behavior of the bulkload and then give the + // right callback + // (ex. what's the expected behavior when there is an error during + // bulkload?) + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils + .getMergePolicyFactory(dataset, mdTxnCtx); + TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, + new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), + compactionInfo.first, compactionInfo.second, + new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, + LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, + filterCmpFactories, btreeFields, filterFields, !temp)); + return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime( + IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, + List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, + List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context, + JobSpecification spec, boolean bulkload) throws AlgebricksException { + return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload, + additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime( + IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, + List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, + RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { + return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload, + additionalNonKeyFields, recordDesc, context, spec, false, null); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime( + IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, + List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, + ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, + boolean bulkload) throws AlgebricksException { + return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, + inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, + context, spec, bulkload, null, null); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime( + IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, + List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, + ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) + throws AlgebricksException { + return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, + inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, + context, spec, false, null, null); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime( + IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, + List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys, + ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys, + LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context, + JobSpecification spec) throws AlgebricksException { + return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, + inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc, + context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey); + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime( + IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, + List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc, + JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException { + + String indexName = dataSourceIndex.getId(); + String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); + String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); + + IOperatorSchema inputSchema; + if (inputSchemas.length > 0) { + inputSchema = inputSchemas[0]; + } else { + throw new AlgebricksException("TokenizeOperator can not operate without any input variable."); + } + + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); + Index secondaryIndex; + try { + secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName); + } catch (MetadataException e) { + throw new AlgebricksException(e); + } + // TokenizeOperator only supports a keyword or n-gram index. + switch (secondaryIndex.getIndexType()) { + case SINGLE_PARTITION_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case LENGTH_PARTITIONED_NGRAM_INVIX: + return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema, + primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType()); + default: + throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: " + + secondaryIndex.getIndexType()); + } + } + + /** + * Calculate an estimate size of the bloom filter. Note that this is an + * estimation which assumes that the data is going to be uniformly + * distributed across all partitions. + * + * @param dataset + * @return Number of elements that will be used to create a bloom filter per + * dataset per partition + * @throws MetadataException + * @throws AlgebricksException + */ + public long getCardinalityPerPartitionHint(Dataset dataset) throws MetadataException, AlgebricksException { + String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME); + long numElementsHint; + if (numElementsHintString == null) { + numElementsHint = DatasetCardinalityHint.DEFAULT; + } else { + numElementsHint = Long.parseLong(numElementsHintString); + } + int numPartitions = 0; + List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()) + .getNodeNames(); + for (String nd : nodeGroup) { + numPartitions += ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); + } + numElementsHint = numElementsHint / numPartitions; + return numElementsHint; + } + + protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName, + Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated, + List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException { + try { + configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName()); + IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, + configuration, itemType, metaType); + + // check to see if dataset is indexed + Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), + dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); + + if (filesIndex != null && filesIndex.getPendingOp() == 0) { + // get files + List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); + Iterator<ExternalFile> iterator = files.iterator(); + while (iterator.hasNext()) { + if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) { + iterator.remove(); + } + } + } + + return adapterFactory; + } catch (Exception e) { + throw new AlgebricksException("Unable to create adapter", e); + } + } + + public JobId getJobId() { + return jobId; + } + + public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields) + throws AlgebricksException { + return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true, + numKeyFields / 2); + } + + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset( + String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { + FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); + return StoragePathUtil.splitProviderAndPartitionConstraints(splits); + } + + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse( + String dataverse) { + return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse); + } + + public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, + String targetIdxName, boolean temp) throws AlgebricksException { + return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); + } + + public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) + throws MetadataException { + DatasourceAdapter adapter; + // search in default namespace (built-in adapter) + adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); + + // search in dataverse (user-defined adapter) + if (adapter == null) { + adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName); + } + return adapter; + } + + public AlgebricksAbsolutePartitionConstraint getClusterLocations() { + return ClusterStateManager.INSTANCE.getClusterLocations(); + } + + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex( + String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException { + return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName, + datasetName, targetIdxName, create); + } + + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( + JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput, + IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema, + JobGenContext context, MetadataProvider metadataProvider, boolean retainMissing) + throws AlgebricksException { + try { + // Get data type + IAType itemType; + itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); + + // Create the adapter factory <- right now there is only one. if there are more in the future, we can create + // a map-> + ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); + LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager, + datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing, + context.getMissingWriterFactory()); + + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo; + try { + compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + } catch (MetadataException e) { + throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e); + } + + boolean temp = datasetDetails.isTemp(); + // Create the file index data flow helper + ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( + compactionInfo.first, compactionInfo.second, + new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, + metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp); + + // Create the out record descriptor, appContext and fileSplitProvider for the files index + RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; + spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), + dataset.getDatasetName(), + dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false); + ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new SecondaryIndexSearchOperationCallbackFactory(); + // Create the operator + ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, + outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(), + appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(), + metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory, + retainMissing, context.getMissingWriterFactory()); + return new Pair<>(op, spPc.second); + } catch (Exception e) { + throw new AlgebricksException(e); + } + } + + @Override + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime( + IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, + List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys, + List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context, + JobSpecification spec) throws AlgebricksException { + String datasetName = dataSource.getId().getDatasourceName(); + Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName); + if (dataset == null) { + throw new AlgebricksException( + "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName()); + } + boolean temp = dataset.getDatasetDetails().isTemp(); + isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; + + int numKeys = primaryKeys.size(); + int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size(); + // Move key fields to front. [keys, record, filters] + int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields]; + int[] bloomFilterKeyFields = new int[numKeys]; + int i = 0; + // set the keys' permutations + for (LogicalVariable varKey : primaryKeys) { + int idx = inputSchema.findVariable(varKey); + fieldPermutation[i] = idx; + bloomFilterKeyFields[i] = i; + i++; + } + // set the record permutation + fieldPermutation[i++] = inputSchema.findVariable(payload); + // set the filters' permutations. + if (numFilterFields > 0) { + int idx = inputSchema.findVariable(filterKeys.get(0)); + fieldPermutation[i++] = idx; + } + + if (additionalNonFilterFields != null) { + for (LogicalVariable var : additionalNonFilterFields) { + int idx = inputSchema.findVariable(var); + fieldPermutation[i++] = idx; + } + } + + try { + Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName()); + String indexName = primaryIndex.getIndexName(); + + String itemTypeName = dataset.getItemTypeName(); + String itemTypeDataverseName = dataset.getItemTypeDataverseName(); + ARecordType itemType = (ARecordType) MetadataManager.INSTANCE + .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype(); + ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); + ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName, + indexName, temp); + + // prepare callback + JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + int datasetId = dataset.getDatasetId(); + int[] primaryKeyFields = new int[numKeys]; + for (i = 0; i < numKeys; i++) { + primaryKeyFields[i] = i; + } + + ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, + itemType, context.getBinaryComparatorFactoryProvider()); + int[] filterFields = DatasetUtils.createFilterFields(dataset); + int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); + + TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); + IModificationOperationCallbackFactory modificationCallbackFactory = temp + ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, + primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE) + : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, + IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart()); + + LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory( + jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); + + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils + .getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory( + new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second, + new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, + btreeFields, filterFields, !temp); + AsterixLSMTreeUpsertOperatorDescriptor op; + + ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + + // add the previous record first + int f = 0; + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType); + f++; + // add the previous meta second + if (dataset.hasMetaPart()) { + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer( + metaItemType); + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); + f++; + } + // add the previous filter third + int fieldIdx = -1; + if (numFilterFields > 0) { + String filterField = DatasetUtils.getFilterField(dataset).get(0); + for (i = 0; i < itemType.getFieldNames().length; i++) { + if (itemType.getFieldNames()[i].equals(filterField)) { + break; + } + } + fieldIdx = i; + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType + .getFieldTypes()[fieldIdx]); + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() + .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); + f++; + } + for (int j = 0; j < recordDesc.getFieldCount(); j++) { + outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j]; + outputSerDes[j + f] = recordDesc.getFields()[j]; + } + + RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); + op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, + idfh, null, true, indexName, context.getMissingWriterFactory(), modificationCallbackFactory, + searchCallbackFactory, null); + op.setType(itemType); + op.setFilterIndex(fieldIdx); + return new Pair<>(op, splitsAndConstraint.second); + + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime( + JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format) + throws AlgebricksException { + if (itemType.getTypeTag() != ATypeTag.RECORD) { + throw new AlgebricksException("Can only scan datasets of records."); + } + + ISerializerDeserializer<?> payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType); + RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); + + ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc, + adapterFactory); + + AlgebricksPartitionConstraint constraint; + try { + constraint = adapterFactory.getPartitionConstraint(); + } catch (Exception e) { + throw new AlgebricksException(e); + } + + return new Pair<>(dataScanner, constraint); + } + + private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex( + List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames, + ARecordType recType, DatasetType dsType, boolean hasMeta, List<Integer> primaryIndexKeyIndicators, + List<Integer> secondaryIndexIndicators, ARecordType metaType) throws AlgebricksException { + + IBinaryComparatorFactory[] comparatorFactories; + ITypeTraits[] typeTraits; + int sidxKeyFieldCount = sidxKeyFieldNames.size(); + int pidxKeyFieldCount = pidxKeyFieldNames.size(); + typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount]; + comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount]; + + int i = 0; + for (; i < sidxKeyFieldCount; ++i) { + Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), + sidxKeyFieldNames.get(i), + (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); + IAType keyType = keyPairType.first; + comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, + true); + typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); + } + + for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) { + IAType keyType = null; + try { + switch (dsType) { + case INTERNAL: + keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1) + ? metaType.getSubFieldType(pidxKeyFieldNames.get(j)) + : recType.getSubFieldType(pidxKeyFieldNames.get(j)); + break; + case EXTERNAL: + keyType = IndexingConstants.getFieldType(j); + break; + default: + throw new AlgebricksException("Unknown Dataset Type"); + } + } catch (AsterixException e) { + throw new AlgebricksException(e); + } + comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, + true); + typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); + } + + return new Pair<>(comparatorFactories, typeTraits); + } + + private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp, + IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys, + LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc, + JobGenContext context, JobSpecification spec, boolean bulkload, + List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException { + + String datasetName = dataSource.getId().getDatasourceName(); + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), + datasetName); + boolean temp = dataset.getDatasetDetails().isTemp(); + isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; + + int numKeys = keys.size(); + int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + // Move key fields to front. + int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())]; + int[] bloomFilterKeyFields = new int[numKeys]; + int i = 0; + for (LogicalVariable varKey : keys) { + int idx = propagatedSchema.findVariable(varKey); + fieldPermutation[i] = idx; + bloomFilterKeyFields[i] = i; + i++; + } + fieldPermutation[i++] = propagatedSchema.findVariable(payload); + if (numFilterFields > 0) { + int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); + fieldPermutation[i++] = idx; + } + if (additionalNonFilteringFields != null) { + for (LogicalVariable variable : additionalNonFilteringFields) { + int idx = propagatedSchema.findVariable(variable); + fieldPermutation[i++] = idx; + } + } + + try { + Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName()); + String indexName = primaryIndex.getIndexName(); + ARecordType itemType = (ARecordType) MetadataManager.INSTANCE + .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype(); + ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); + ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); + + IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName, + indexName, temp); + + // prepare callback + JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + int datasetId = dataset.getDatasetId(); + int[] primaryKeyFields = new int[numKeys]; + for (i = 0; i < numKeys; i++) { + primaryKeyFields[i] = i; + } + + ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, + itemType, context.getBinaryComparatorFactoryProvider()); + int[] filterFields = DatasetUtils.createFilterFields(dataset); + int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); + + TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); + IModificationOperationCallbackFactory modificationCallbackFactory = temp + ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, + primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE) + : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields, + txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart()); + + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils + .getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory( + new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second, + new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, + btreeFields, filterFields, !temp); + IOperatorDescriptor op; + if (bulkload) { + long numElementsHint = getCardinalityPerPartitionHint(dataset); + op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh); + } else { + op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, + appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), + splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, + fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory, + NoOpOperationCallbackFactory.INSTANCE); + } + return new Pair<>(op, splitsAndConstraint.second); + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime( + IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, + List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, + List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc, + JobGenContext context, JobSpecification spec, boolean bulkload, List<LogicalVariable> prevSecondaryKeys, + LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException { + String indexName = dataSourceIndex.getId(); + String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); + String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); + + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); + Index secondaryIndex; + try { + secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName); + } catch (MetadataException e) { + throw new AlgebricksException(e); + } + + ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null; + if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) { + prevAdditionalFilteringKeys = new ArrayList<>(); + prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey); + } + AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context); + switch (secondaryIndex.getIndexType()) { + case BTREE: + return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, + secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + case RTREE: + return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, + secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + case SINGLE_PARTITION_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case LENGTH_PARTITIONED_NGRAM_INVIX: + return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, + secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + default: + throw new AlgebricksException( + indexOp.name() + "Insert, upsert, and delete not implemented for index type: " + + secondaryIndex.getIndexType()); + } + } + + private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(String dataverseName, + String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, + List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, + AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context, + JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys, + List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException { + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); + boolean temp = dataset.getDatasetDetails().isTemp(); + isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; + + int numKeys = primaryKeys.size() + secondaryKeys.size(); + int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + + // generate field permutations + int[] fieldPermutation = new int[numKeys + numFilterFields]; + int[] bloomFilterKeyFields = new int[secondaryKeys.size()]; + int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; + int i = 0; + int j = 0; + for (LogicalVariable varKey : secondaryKeys) { + int idx = propagatedSchema.findVariable(varKey); + fieldPermutation[i] = idx; + bloomFilterKeyFields[i] = i; + i++; + } +
<TRUNCATED>