http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java new file mode 100644 index 0000000..362305e --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexResourceFactoryProvider.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.metadata.api.IResourceFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.RecordUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; +import org.apache.hyracks.data.std.primitive.ShortPointable; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; +import org.apache.hyracks.storage.common.IResourceFactory; +import org.apache.hyracks.storage.common.IStorageManager; + +public class InvertedIndexResourceFactoryProvider implements IResourceFactoryProvider { + public static final InvertedIndexResourceFactoryProvider INSTANCE = new InvertedIndexResourceFactoryProvider(); + + private InvertedIndexResourceFactoryProvider() { + } + + @Override + public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { + // Get basic info + List<List<String>> primaryKeys = dataset.getPrimaryKeys(); + List<List<String>> secondaryKeys = index.getKeyFieldNames(); + List<String> filterFieldName = DatasetUtil.getFilterField(dataset); + int numPrimaryKeys = primaryKeys.size(); + int numSecondaryKeys = secondaryKeys.size(); + // Validate + if (dataset.getDatasetType() != DatasetType.INTERNAL) { + throw new CompilationException(ErrorCode.COMPILATION_INDEX_TYPE_NOT_SUPPORTED_FOR_DATASET_TYPE, + index.getIndexType().name(), dataset.getDatasetType()); + } + if (numPrimaryKeys > 1) { + throw new AsterixException("Cannot create inverted index on dataset with composite primary key."); + } + if (numSecondaryKeys > 1) { + throw new AsterixException("Cannot create composite inverted index on multiple fields."); + } + boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX; + int numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys; + int[] invertedIndexFields = null; + int[] secondaryFilterFieldsForNonBulkLoadOps = null; + int[] invertedIndexFieldsForNonBulkLoadOps = null; + int[] secondaryFilterFields = null; + if (filterFieldName != null) { + invertedIndexFields = new int[numTokenKeyPairFields]; + for (int i = 0; i < invertedIndexFields.length; i++) { + invertedIndexFields[i] = i; + } + secondaryFilterFieldsForNonBulkLoadOps = new int[filterFieldName.size()]; + secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys; + invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) { + invertedIndexFieldsForNonBulkLoadOps[i] = i; + } + secondaryFilterFields = new int[filterFieldName.size()]; + secondaryFilterFields[0] = numTokenKeyPairFields - numPrimaryKeys + numPrimaryKeys; + } + IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); + IStorageManager storageManager = storageComponentProvider.getStorageManager(); + ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index); + IMetadataPageManagerFactory metadataPageManagerFactory = + storageComponentProvider.getMetadataPageManagerFactory(); + AsterixVirtualBufferCacheProvider vbcProvider = new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()); + ILSMIOOperationSchedulerProvider ioSchedulerProvider = + storageComponentProvider.getIoOperationSchedulerProvider(); + boolean durable = !dataset.isTemp(); + double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(); + ITypeTraits[] typeTraits = getInvListTypeTraits(mdProvider, dataset, recordType, metaType); + IBinaryComparatorFactory[] cmpFactories = + getInvListComparatorFactories(mdProvider, dataset, recordType, metaType); + ITypeTraits[] tokenTypeTraits = getTokenTypeTraits(dataset, index, recordType, metaType); + IBinaryComparatorFactory[] tokenCmpFactories = + getTokenComparatorFactories(dataset, index, recordType, metaType); + IBinaryTokenizerFactory tokenizerFactory = getTokenizerFactory(dataset, index, recordType, metaType); + return new LSMInvertedIndexLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits, + filterCmpFactories, secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory, + metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, + durable, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, isPartitioned, invertedIndexFields, + secondaryFilterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, + bloomFilterFalsePositiveRate); + } + + private static ITypeTraits[] getInvListTypeTraits(MetadataProvider metadataProvider, Dataset dataset, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType); + ITypeTraits[] typeTraits = new ITypeTraits[primaryTypeTraits.length - 1]; + for (int i = 0; i < typeTraits.length; i++) { + typeTraits[i] = primaryTypeTraits[i]; + } + return typeTraits; + } + + private static IBinaryComparatorFactory[] getInvListComparatorFactories(MetadataProvider metadataProvider, + Dataset dataset, ARecordType recordType, ARecordType metaType) throws AlgebricksException { + return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); + } + + private static ITypeTraits[] getTokenTypeTraits(Dataset dataset, Index index, ARecordType recordType, + ARecordType metaType) throws AlgebricksException { + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + IndexType indexType = index.getIndexType(); + // Sanity checks. + if (numPrimaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, + indexType, RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName())); + } + if (numSecondaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys, + indexType, 1); + } + boolean isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX; + ARecordType sourceType; + List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), sourceType); + IAType secondaryKeyType = keyTypePair.first; + int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; + ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields]; + tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); + if (isPartitioned) { + // The partitioning field is hardcoded to be a short *without* an Asterix type tag. + tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; + } + return tokenTypeTraits; + } + + private static IBinaryComparatorFactory[] getTokenComparatorFactories(Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + IndexType indexType = index.getIndexType(); + // Sanity checks. + if (numPrimaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, + indexType, RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName())); + } + if (numSecondaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys, + indexType, 1); + } + boolean isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX; + List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators(); + ARecordType sourceType; + if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), sourceType); + IAType secondaryKeyType = keyTypePair.first; + // Comparators and type traits for tokens. + int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; + IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; + tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); + if (isPartitioned) { + // The partitioning field is hardcoded to be a short *without* an Asterix type tag. + tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); + } + return tokenComparatorFactories; + } + + private static IBinaryTokenizerFactory getTokenizerFactory(Dataset dataset, Index index, ARecordType recordType, + ARecordType metaType) throws AlgebricksException { + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + IndexType indexType = index.getIndexType(); + // Sanity checks. + if (numPrimaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, + indexType, RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName())); + } + if (numSecondaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys, + indexType, 1); + } + ARecordType sourceType; + List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), sourceType); + IAType secondaryKeyType = keyTypePair.first; + // Set tokenizer factory. + // TODO: We might want to expose the hashing option at the AQL level, + // and add the choice to the index metadata. + return NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType, + index.getGramLength()); + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java deleted file mode 100644 index 45034de..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.utils; - -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.utils.NonTaggedFormatUtil; -import org.apache.commons.lang.StringUtils; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory; -import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; - -public class RTreeDataflowHelperFactoryProvider implements IIndexDataflowHelperFactoryProvider { - - public static final RTreeDataflowHelperFactoryProvider INSTANCE = new RTreeDataflowHelperFactoryProvider(); - - private RTreeDataflowHelperFactoryProvider() { - } - - protected RTreePolicyType rTreePolicyType() { - return RTreePolicyType.RTREE; - } - - @Override - public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset, - Index index, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { - if (index.getKeyFieldNames().size() != 1) { - throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, - index.getKeyFieldNames().size(), index.getIndexType(), 1); - } - IAType spatialType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), - index.getKeyFieldNames().get(0), recordType).first; - if (spatialType == null) { - throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND, - StringUtils.join(index.getKeyFieldNames().get(0), '.')); - } - List<List<String>> primaryKeyFields = DatasetUtil.getPartitioningKeys(dataset); - int numPrimaryKeys = primaryKeyFields.size(); - ITypeTraits[] primaryTypeTraits = null; - IBinaryComparatorFactory[] primaryComparatorFactories = null; - IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; - primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys]; - List<Integer> indicators = null; - if (dataset.hasMetaPart()) { - indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); - } - for (int i = 0; i < numPrimaryKeys; i++) { - IAType keyType = (indicators == null || indicators.get(i) == 0) - ? recordType.getSubFieldType(primaryKeyFields.get(i)) - : metaType.getSubFieldType(primaryKeyFields.get(i)); - primaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider() - .getBinaryComparatorFactory(keyType, true); - primaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(keyType); - } - primaryTypeTraits[numPrimaryKeys] = - storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType); - if (dataset.hasMetaPart()) { - primaryTypeTraits[numPrimaryKeys + 1] = - storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType); - } - } - boolean isPointMBR = - spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; - int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); - int numNestedSecondaryKeyFields = numDimensions * 2; - IBinaryComparatorFactory[] secondaryComparatorFactories = - new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; - IPrimitiveValueProviderFactory[] valueProviderFactories = - new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; - ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); - ATypeTag keyType = nestedKeyType.getTypeTag(); - for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - secondaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider() - .getBinaryComparatorFactory(nestedKeyType, true); - secondaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(nestedKeyType); - valueProviderFactories[i] = storageComponentProvider.getPrimitiveValueProviderFactory(); - - } - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryTypeTraits[numNestedSecondaryKeyFields + i] = (dataset.getDatasetType() == DatasetType.INTERNAL) - ? primaryTypeTraits[i] : IndexingConstants.getTypeTraits(i); - } - int[] rtreeFields = null; - if (filterTypeTraits != null && filterTypeTraits.length > 0) { - rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; - for (int i = 0; i < rtreeFields.length; i++) { - rtreeFields[i] = i; - } - } - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - int[] secondaryFilterFields = (filterTypeTraits != null && filterTypeTraits.length > 0) - ? new int[] { numNestedSecondaryKeyFields + numPrimaryKeys } : null; - IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree( - secondaryTypeTraits, primaryComparatorFactories, secondaryComparatorFactories); - return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rTreePolicyType(), - btreeCompFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index), - storageComponentProvider.getIoOperationSchedulerProvider(), - dataset.getIoOperationCallbackFactory(index), - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), rtreeFields, - filterTypeTraits, filterCmpFactories, secondaryFilterFields, !dataset.getDatasetDetails().isTemp(), - isPointMBR); - } else { - return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, rTreePolicyType(), - ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), mergePolicyFactory, - mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index), - storageComponentProvider.getIoOperationSchedulerProvider(), - dataset.getIoOperationCallbackFactory(index), - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), - new int[] { numNestedSecondaryKeyFields }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR); - } - } - - private static IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree( - ITypeTraits[] secondaryTypeTraits, IBinaryComparatorFactory[] primaryComparatorFactories, - IBinaryComparatorFactory[] secondaryComparatorFactories) { - IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length]; - int i = 0; - for (; i < secondaryComparatorFactories.length; i++) { - btreeCompFactories[i] = secondaryComparatorFactories[i]; - } - for (int j = 0; i < secondaryTypeTraits.length; i++, j++) { - btreeCompFactories[i] = primaryComparatorFactories[j]; - } - return btreeCompFactories; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java new file mode 100644 index 0000000..425f8a1 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.metadata.api.IResourceFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterLocalResourceFactory; +import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; +import org.apache.hyracks.storage.common.IResourceFactory; +import org.apache.hyracks.storage.common.IStorageManager; + +public class RTreeResourceFactoryProvider implements IResourceFactoryProvider { + + private static final RTreePolicyType rTreePolicyType = RTreePolicyType.RTREE; + public static final RTreeResourceFactoryProvider INSTANCE = new RTreeResourceFactoryProvider(); + + private RTreeResourceFactoryProvider() { + } + + @Override + public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { + if (index.getKeyFieldNames().size() != 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, + index.getKeyFieldNames().size(), index.getIndexType(), 1); + } + IAType spatialType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), recordType).first; + if (spatialType == null) { + throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND, + StringUtils.join(index.getKeyFieldNames().get(0), '.')); + } + List<List<String>> primaryKeyFields = dataset.getPrimaryKeys(); + int numPrimaryKeys = primaryKeyFields.size(); + ITypeTraits[] primaryTypeTraits = null; + IBinaryComparatorFactory[] primaryComparatorFactories = null; + IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; + primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys]; + List<Integer> indicators = null; + if (dataset.hasMetaPart()) { + indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); + } + for (int i = 0; i < numPrimaryKeys; i++) { + IAType keyType = (indicators == null || indicators.get(i) == 0) + ? recordType.getSubFieldType(primaryKeyFields.get(i)) + : metaType.getSubFieldType(primaryKeyFields.get(i)); + primaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider() + .getBinaryComparatorFactory(keyType, true); + primaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(keyType); + } + primaryTypeTraits[numPrimaryKeys] = + storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType); + if (dataset.hasMetaPart()) { + primaryTypeTraits[numPrimaryKeys + 1] = + storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType); + } + } + boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; + int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); + int numNestedSecondaryKeyFields = numDimensions * 2; + IBinaryComparatorFactory[] secondaryComparatorFactories = + new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; + IPrimitiveValueProviderFactory[] valueProviderFactories = + new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; + ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); + ATypeTag keyType = nestedKeyType.getTypeTag(); + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + secondaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider() + .getBinaryComparatorFactory(nestedKeyType, true); + secondaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(nestedKeyType); + valueProviderFactories[i] = storageComponentProvider.getPrimitiveValueProviderFactory(); + + } + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryTypeTraits[numNestedSecondaryKeyFields + i] = (dataset.getDatasetType() == DatasetType.INTERNAL) + ? primaryTypeTraits[i] : IndexingConstants.getTypeTraits(i); + } + int[] rtreeFields = null; + if (filterTypeTraits != null && filterTypeTraits.length > 0) { + rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; + for (int i = 0; i < rtreeFields.length; i++) { + rtreeFields[i] = i; + } + } + IStorageManager storageManager = storageComponentProvider.getStorageManager(); + ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index); + IMetadataPageManagerFactory metadataPageManagerFactory = + storageComponentProvider.getMetadataPageManagerFactory(); + ILSMIOOperationSchedulerProvider ioSchedulerProvider = + storageComponentProvider.getIoOperationSchedulerProvider(); + boolean durable = !dataset.isTemp(); + ILinearizeComparatorFactory linearizeCmpFactory = + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length); + ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, recordType, metaType); + IBinaryComparatorFactory[] rtreeCmpFactories = getCmpFactories(mdProvider, index, recordType, metaType); + int[] secondaryFilterFields = (filterTypeTraits != null && filterTypeTraits.length > 0) + ? new int[] { numNestedSecondaryKeyFields + numPrimaryKeys } : null; + IBinaryComparatorFactory[] btreeCompFactories = + dataset.getDatasetType() == DatasetType.EXTERNAL ? IndexingConstants.getBuddyBtreeComparatorFactories() + : getComparatorFactoriesForDeletedKeyBTree(secondaryTypeTraits, primaryComparatorFactories, + secondaryComparatorFactories); + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + AsterixVirtualBufferCacheProvider vbcProvider = + new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()); + return new LSMRTreeWithAntiMatterLocalResourceFactory(storageManager, typeTraits, rtreeCmpFactories, + filterTypeTraits, filterCmpFactories, secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory, + metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, + mergePolicyProperties, durable, valueProviderFactories, rTreePolicyType, linearizeCmpFactory, + rtreeFields, isPointMBR, btreeCompFactories); + } else { + return new ExternalRTreeLocalResourceFactory(storageManager, typeTraits, rtreeCmpFactories, + filterTypeTraits, filterCmpFactories, secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory, + metadataPageManagerFactory, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, + btreeCompFactories, valueProviderFactories, rTreePolicyType, linearizeCmpFactory, rtreeFields, + new int[] { numNestedSecondaryKeyFields }, isPointMBR, + mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate()); + } + } + + private static IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree( + ITypeTraits[] secondaryTypeTraits, IBinaryComparatorFactory[] primaryComparatorFactories, + IBinaryComparatorFactory[] secondaryComparatorFactories) { + IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length]; + int i = 0; + for (; i < secondaryComparatorFactories.length; i++) { + btreeCompFactories[i] = secondaryComparatorFactories[i]; + } + for (int j = 0; i < secondaryTypeTraits.length; i++, j++) { + btreeCompFactories[i] = primaryComparatorFactories[j]; + } + return btreeCompFactories; + } + + private static ITypeTraits[] getTypeTraits(MetadataProvider metadataProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + ITypeTraitProvider ttProvider = metadataProvider.getStorageComponentProvider().getTypeTraitProvider(); + List<List<String>> secondaryKeyFields = index.getKeyFieldNames(); + int numSecondaryKeys = secondaryKeyFields.size(); + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType); + if (numSecondaryKeys != 1) { + throw new AsterixException("Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " + + "There can be only one field as a key for the R-tree index."); + } + ARecordType sourceType; + List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + secondaryKeyFields.get(0), sourceType); + IAType spatialType = spatialTypePair.first; + if (spatialType == null) { + throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); + } + int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); + int numNestedSecondaryKeyFields = numDimensions * 2; + ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + secondaryTypeTraits[i] = ttProvider.getTypeTrait(nestedKeyType); + } + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryTypeTraits[i]; + } + return secondaryTypeTraits; + } + + private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider metadataProvider, Index index, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + IBinaryComparatorFactoryProvider cmpFactoryProvider = + metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider(); + List<List<String>> secondaryKeyFields = index.getKeyFieldNames(); + int numSecondaryKeys = secondaryKeyFields.size(); + if (numSecondaryKeys != 1) { + throw new AsterixException("Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " + + "There can be only one field as a key for the R-tree index."); + } + List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators(); + ARecordType sourceType; + if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + secondaryKeyFields.get(0), sourceType); + IAType spatialType = spatialTypePair.first; + if (spatialType == null) { + throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); + } + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); + int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); + int numNestedSecondaryKeyFields = numDimensions * 2; + IBinaryComparatorFactory[] secondaryComparatorFactories = + new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + secondaryComparatorFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(nestedKeyType, true); + } + return secondaryComparatorFactories; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java index faea7fd..1f7914d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -31,10 +30,6 @@ import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.utils.RuntimeUtils; -import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; @@ -53,16 +48,13 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperationsHelper { @@ -73,57 +65,12 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations } @Override - public JobSpecification buildCreationJobSpec() throws AlgebricksException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - ILocalResourceFactoryProvider localResourceFactoryProvider; - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, - index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - LSMBTreeLocalResourceMetadataFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory( - secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields, false, - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, - filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, - dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index), - storageComponentProvider.getMetadataPageManagerFactory()); - localResourceFactoryProvider = - new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource); - } else { - // External dataset local resource and dataflow helper - int[] buddyBreeFields = new int[] { index.getKeyFieldNames().size() }; - ExternalBTreeWithBuddyLocalResourceMetadataFactory localResourceMetadata = - new ExternalBTreeWithBuddyLocalResourceMetadataFactory(dataset.getDatasetId(), - secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory, - mergePolicyFactoryProperties, buddyBreeFields, - dataset.getIndexOperationTrackerFactory(index), - dataset.getIoOperationCallbackFactory(index), - storageComponentProvider.getMetadataPageManagerFactory()); - localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, - LocalResource.ExternalBTreeWithBuddyResource); - } - TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = - new TreeIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), - storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider, - secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields, - indexDataflowHelperFactory, - localResourceFactoryProvider, dataset.getModificationCallbackFactory(storageComponentProvider, - index, null, IndexOperation.CREATE, null), - storageComponentProvider.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(secondaryIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds(); int[] fieldPermutation = createFieldPermutationForBulkLoadOp(index.getKeyFieldNames().size()); - IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, - index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties); + IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider); if (dataset.getDatasetType() == DatasetType.EXTERNAL) { /* * In case of external data, @@ -152,23 +99,22 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations // Sort by secondary keys. ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); // Create secondary BTree bulk load op. - AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp; + AbstractSingleActivityOperatorDescriptor secondaryBulkLoadOp; IOperatorDescriptor root; if (externalFiles != null) { // Transaction load secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation, dataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - root = secondaryBulkLoadOp; } else { // Initial load - secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory, + secondaryBulkLoadOp = createExternalIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, - new RecordDescriptor[] { secondaryRecDesc }); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - root = metaOp; } + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, + new RecordDescriptor[] { secondaryRecDesc }); + spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); + root = metaOp; spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index e5b5b9f..fa519e1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -34,17 +34,18 @@ import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor; import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.evaluators.functions.AndDescriptor; @@ -52,7 +53,6 @@ import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor; import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor; import org.apache.asterix.runtime.evaluators.functions.NotDescriptor; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -83,6 +83,7 @@ import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; @@ -117,7 +118,7 @@ public abstract class SecondaryIndexOperationsHelper { protected RecordDescriptor secondaryRecDesc; protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories; protected ILSMMergePolicyFactory mergePolicyFactory; - protected Map<String, String> mergePolicyFactoryProperties; + protected Map<String, String> mergePolicyProperties; protected RecordDescriptor enforcedRecDesc; protected int numFilterFields; protected List<String> filterFieldName; @@ -132,8 +133,8 @@ public abstract class SecondaryIndexOperationsHelper { // Prevent public construction. Should be created via createIndexCreator(). protected SecondaryIndexOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf, - MetadataProvider metadataProvider, ARecordType recType, - ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) { + MetadataProvider metadataProvider, ARecordType recType, ARecordType metaType, ARecordType enforcedType, + ARecordType enforcedMetaType) { this.dataset = dataset; this.index = index; this.physOptConf = physOptConf; @@ -150,14 +151,12 @@ public abstract class SecondaryIndexOperationsHelper { SecondaryIndexOperationsHelper indexOperationsHelper; switch (index.getIndexType()) { case BTREE: - indexOperationsHelper = - new SecondaryBTreeOperationsHelper(dataset, index, physOptConf, - metadataProvider, recType, metaType, enforcedType, enforcedMetaType); + indexOperationsHelper = new SecondaryBTreeOperationsHelper(dataset, index, physOptConf, + metadataProvider, recType, metaType, enforcedType, enforcedMetaType); break; case RTREE: - indexOperationsHelper = - new SecondaryRTreeOperationsHelper(dataset, index, physOptConf, - metadataProvider, recType, metaType, enforcedType, enforcedMetaType); + indexOperationsHelper = new SecondaryRTreeOperationsHelper(dataset, index, physOptConf, + metadataProvider, recType, metaType, enforcedType, enforcedMetaType); break; case SINGLE_PARTITION_WORD_INVIX: case SINGLE_PARTITION_NGRAM_INVIX: @@ -189,7 +188,7 @@ public abstract class SecondaryIndexOperationsHelper { metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); secondaryFileSplitProvider = secondarySplitsAndConstraint.first; secondaryPartitionConstraint = secondarySplitsAndConstraint.second; - numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); + numPrimaryKeys = dataset.getPrimaryKeys().size(); if (dataset.getDatasetType() == DatasetType.INTERNAL) { filterFieldName = DatasetUtil.getFilterField(dataset); if (filterFieldName != null) { @@ -208,7 +207,7 @@ public abstract class SecondaryIndexOperationsHelper { Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); mergePolicyFactory = compactionInfo.first; - mergePolicyFactoryProperties = compactionInfo.second; + mergePolicyProperties = compactionInfo.second; if (numFilterFields > 0) { setFilterTypeTraitsAndComparators(); } @@ -238,7 +237,7 @@ public abstract class SecondaryIndexOperationsHelper { protected abstract int getNumSecondaryKeys(); protected void setPrimaryRecDescAndComparators() throws AlgebricksException { - List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + List<List<String>> partitioningKeys = dataset.getPrimaryKeys(); ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; @@ -292,8 +291,7 @@ public abstract class SecondaryIndexOperationsHelper { return keyProviderOp; } - protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) - throws AlgebricksException { + protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException { // -Infinity int[] lowKeyFields = null; // +Infinity @@ -304,22 +302,15 @@ public abstract class SecondaryIndexOperationsHelper { boolean isWriteTransaction = metadataProvider.isWriteTransaction(); IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); spec.setJobletEventListenerFactory(jobEventListenerFactory); - Index primaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), - dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - boolean temp = dataset.getDatasetDetails().isTemp(); ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); - BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, - primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true, - dataset.getIndexDataflowHelperFactory(metadataProvider, primaryIndex, itemType, metaType, - mergePolicyFactory, mergePolicyFactoryProperties), - false, false, null, searchCallbackFactory, null, null, - metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); - + IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); + BTreeSearchOperatorDescriptor primarySearchOp = + new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, lowKeyFields, highKeyFields, true, true, + indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, primaryPartitionConstraint); return primarySearchOp; @@ -401,11 +392,18 @@ public abstract class SecondaryIndexOperationsHelper { int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) throws AlgebricksException { TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec, - secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, - fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, - metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, + secondaryPartitionConstraint); + return treeIndexBulkLoadOp; + } + + protected TreeIndexBulkLoadOperatorDescriptor createExternalIndexBulkLoadOp(JobSpecification spec, + int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) + throws AlgebricksException { + ExternalIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkLoadOperatorDescriptor(spec, + secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider)); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); return treeIndexBulkLoadOp; @@ -517,11 +515,7 @@ public abstract class SecondaryIndexOperationsHelper { } } ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor( - spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, - secondaryBloomFilterKeyFields, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE, - deletedFiles, fieldPermutation, fillFactor, numElementsHint, - metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + spec, dataflowHelperFactory, deletedFiles, fieldPermutation, fillFactor, false, numElementsHint, false); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); return treeIndexBulkLoadOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java index 985f8cc..1bb1377 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java @@ -18,14 +18,10 @@ */ package org.apache.asterix.metadata.utils; -import java.util.Map; - import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.IResourceFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; @@ -35,9 +31,6 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.om.utils.RecordUtil; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.utils.RuntimeUtils; -import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -58,23 +51,16 @@ import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDropOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; -public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper { +public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOperationsHelper { private IAType secondaryKeyType; private ITypeTraits[] invListsTypeTraits; @@ -219,36 +205,6 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperat } @Override - public JobSpecification buildCreationJobSpec() throws AlgebricksException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - IResourceFactory localResourceMetadata = new LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits, - primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned, - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, - filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps, dataset.getIndexOperationTrackerFactory(index), - dataset.getIoOperationCallbackFactory(index), storageComponentProvider.getMetadataPageManagerFactory()); - ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.LSMInvertedIndexResource); - - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = - new LSMInvertedIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), - secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(), - tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, - tokenizerFactory, dataflowHelperFactory, - localResourceFactoryProvider, dataset.getModificationCallbackFactory(storageComponentProvider, - index, null, IndexOperation.CREATE, null), - storageComponentProvider.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(invIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); @@ -282,7 +238,9 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperat createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc); // Create secondary inverted index bulk load op. - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec); + AbstractSingleActivityOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp, + secondaryPartitionConstraint); AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); @@ -331,71 +289,15 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperat return sortOp; } - private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) + private AbstractSingleActivityOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) throws AlgebricksException { int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields]; for (int i = 0; i < fieldPermutation.length; i++) { fieldPermutation[i] = i; } - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor( - spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false, - storageComponentProvider.getStorageManager(), secondaryFileSplitProvider, - storageComponentProvider.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory, - storageComponentProvider.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp, - secondaryPartitionConstraint); - return invIndexBulkLoadOp; - } - - private IIndexDataflowHelperFactory createDataflowHelperFactory() throws AlgebricksException { - return dataset.getIndexDataflowHelperFactory(metadataProvider, index, itemType, metaType, mergePolicyFactory, - mergePolicyFactoryProperties); - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - LSMInvertedIndexCompactOperator compactOp = - new LSMInvertedIndexCompactOperator(spec, storageComponentProvider.getStorageManager(), - secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(), - tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, - tokenizerFactory, dataflowHelperFactory, - dataset.getModificationCallbackFactory(storageComponentProvider, index, null, - IndexOperation.FULL_MERGE, null), - storageComponentProvider.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - public JobSpecification buildDropJobSpec() throws AlgebricksException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - ARecordType recordType = - (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); - ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset); - IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, - index, recordType, metaType, compactionInfo.first, compactionInfo.second); - LSMInvertedIndexDropOperatorDescriptor invIdxDrop = - new LSMInvertedIndexDropOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), - storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, - dataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIdxDrop, - splitsAndConstraint.second); - spec.addRoot(invIdxDrop); - return spec; + IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider); + return createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR); } }