http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexOperatorDescriptor.java deleted file mode 100644 index 556fb51..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexOperatorDescriptor.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.operators; - -import java.util.List; - -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.FileIndexTupleTranslator; -import org.apache.asterix.external.indexing.FilesIndexDescription; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.btree.impls.BTree; -import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader; -import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree; -import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree.LSMTwoPCBTreeBulkLoader; -import org.apache.hyracks.storage.common.IStorageManager; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; - -/** - * This operator is intended solely for external dataset files replicated index. - * It either create and bulkload when used for a new index - * or bulkmodify the index creating a hidden transaction component which later might be committed or deleted by another operator - * - * @author alamouda - */ -public class ExternalFilesIndexOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor { - - private static final long serialVersionUID = 1L; - private boolean createNewIndex; - private List<ExternalFile> files; - - public ExternalFilesIndexOperatorDescriptor(IOperatorDescriptorRegistry spec, IStorageManager storageManager, - IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider, - IIndexDataflowHelperFactory dataflowHelperFactory, - ILocalResourceFactoryProvider localResourceFactoryProvider, List<ExternalFile> files, - boolean createNewIndex, IMetadataPageManagerFactory metadataPageManagerFactory) { - super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, - new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS, - FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS, - dataflowHelperFactory, null, false, false, null, localResourceFactoryProvider, - NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, - metadataPageManagerFactory); - this.createNewIndex = createNewIndex; - this.files = files; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - final IIndexDataflowHelper indexHelper = - getIndexDataflowHelperFactory().createIndexDataflowHelper(this, ctx, partition); - return new AbstractOperatorNodePushable() { - - @SuppressWarnings("incomplete-switch") - @Override - public void initialize() throws HyracksDataException { - FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator(); - if (createNewIndex) { - // Create - indexHelper.create(); - // Open and get - indexHelper.open(); - try { - IIndex index = indexHelper.getIndexInstance(); - // Create bulk loader - IIndexBulkLoader bulkLoader = - index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false); - // Load files - for (ExternalFile file : files) { - bulkLoader.add(filesTupleTranslator.getTupleFromFile(file)); - } - bulkLoader.end(); - } finally { - indexHelper.close(); - } - } else { - ///////// Bulk modify ////////// - // Open and get - indexHelper.open(); - IIndex index = indexHelper.getIndexInstance(); - LSMTwoPCBTreeBulkLoader bulkLoader = null; - try { - bulkLoader = (LSMTwoPCBTreeBulkLoader) ((ExternalBTree) index) - .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false); - // Load files - // The files must be ordered according to their numbers - for (ExternalFile file : files) { - switch (file.getPendingOp()) { - case ADD_OP: - case APPEND_OP: - bulkLoader.add(filesTupleTranslator.getTupleFromFile(file)); - break; - case DROP_OP: - bulkLoader.delete(filesTupleTranslator.getTupleFromFile(file)); - break; - } - } - bulkLoader.end(); - } catch (Exception e) { - if (bulkLoader != null) { - bulkLoader.abort(); - } - throw HyracksDataException.create(e); - } finally { - indexHelper.close(); - } - } - } - - @Override - public void deinitialize() throws HyracksDataException { - } - - @Override - public int getInputArity() { - return 0; - } - - @Override - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) - throws HyracksDataException { - } - - @Override - public IFrameWriter getInputFrameWriter(int index) { - return null; - } - - }; - } - -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java new file mode 100644 index 0000000..83e4566 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.operators; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; + +public class ExternalIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor { + + private static final long serialVersionUID = 1L; + private final int version; + + public ExternalIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, int version) { + super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + indexHelperFactory); + this.version = version; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new ExternalIndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java new file mode 100644 index 0000000..573de5d --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.operators; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; +import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; + +public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable { + + private final int version; + + public ExternalIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory, + IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc, int version) + throws HyracksDataException { + super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, + checkIfEmptyIndex, recDesc); + this.version = version; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + ((ITwoPCIndex) index).setCurrentVersion(version); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java index a994c1f..52230e0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java @@ -20,52 +20,31 @@ package org.apache.asterix.external.operators; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.common.IStorageManager; -import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -public class ExternalIndexBulkModifyOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor { +public class ExternalIndexBulkModifyOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor { private static final long serialVersionUID = 1L; private final int[] deletedFiles; - private final int[] fieldPermutation; - private final float fillFactor; - private final long numElementsHint; public ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec, - IStorageManager storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider, - IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, - IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, - IIndexDataflowHelperFactory dataflowHelperFactory, - IModificationOperationCallbackFactory modificationOpCallbackFactory, int[] deletedFiles, - int[] fieldPermutation, float fillFactor, long numElementsHint, - IMetadataPageManagerFactory metadataPageManagerFactory) { - super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, - comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false, false, null, - NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, - modificationOpCallbackFactory, metadataPageManagerFactory); + IIndexDataflowHelperFactory dataflowHelperFactory, int[] deletedFiles, int[] fieldPermutation, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmpty) { + super(spec, null, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmpty, + dataflowHelperFactory); this.deletedFiles = deletedFiles; - this.fieldPermutation = fieldPermutation; - this.fillFactor = fillFactor; - this.numElementsHint = numElementsHint; } @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new ExternalIndexBulkModifyOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor, - numElementsHint, recordDescProvider, deletedFiles); + return new ExternalIndexBulkModifyOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), deletedFiles); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java index d3bd2bb..e47bb07 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java @@ -23,29 +23,30 @@ import java.nio.ByteBuffer; import org.apache.asterix.external.indexing.FilesIndexDescription; import org.apache.asterix.om.base.AMutableInt32; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOperatorNodePushable { - private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); private final int[] deletedFiles; private ArrayTupleBuilder buddyBTreeTupleBuilder = - new ArrayTupleBuilder(filesIndexDescription.FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFieldCount()); + new ArrayTupleBuilder(FilesIndexDescription.FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFieldCount()); private AMutableInt32 fileNumber = new AMutableInt32(0); private ArrayTupleReference deleteTuple = new ArrayTupleReference(); - public ExternalIndexBulkModifyOperatorNodePushable(ExternalIndexBulkModifyOperatorDescriptor opDesc, - IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, long numElementsHint, - IRecordDescriptorProvider recordDescProvider, int[] deletedFiles) throws HyracksDataException { - super(opDesc, ctx, partition, fieldPermutation, fillFactor, false, numElementsHint, false, recordDescProvider); + public ExternalIndexBulkModifyOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, + IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmpty, RecordDescriptor inputRecDesc, int[] deletedFiles) + throws HyracksDataException { + super(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, + checkIfEmpty, inputRecDesc); this.deletedFiles = deletedFiles; } @@ -54,7 +55,6 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp // It uses the bulkLoader to insert delete tuples for the deleted files @Override public void open() throws HyracksDataException { - RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(recDesc); indexHelper.open(); index = indexHelper.getIndexInstance(); @@ -66,7 +66,7 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp // Delete files for (int i = 0; i < deletedFiles.length; i++) { fileNumber.setValue(deletedFiles[i]); - filesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber); + FilesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber); ((ITwoPCIndexBulkLoader) bulkLoader).delete(deleteTuple); } } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java index d4718a9..20744bc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java @@ -23,43 +23,35 @@ import java.nio.ByteBuffer; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; import org.apache.asterix.external.dataset.adapter.LookupAdapter; import org.apache.asterix.external.indexing.ExternalFileIndexAccessor; -import org.apache.asterix.external.indexing.FilesIndexDescription; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelper; -import org.apache.hyracks.storage.common.IStorageManager; /* * This operator is intended for using record ids to access data in external sources */ -public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor { +public class ExternalLookupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; private final LookupAdapterFactory<?> adapterFactory; + private final IIndexDataflowHelperFactory dataflowHelperFactory; + private final int version; + private final ISearchOperationCallbackFactory searchOpCallbackFactory; public ExternalLookupOperatorDescriptor(IOperatorDescriptorRegistry spec, LookupAdapterFactory<?> adapterFactory, - RecordDescriptor outRecDesc, IIndexDataflowHelperFactory externalFilesIndexDataFlowHelperFactory, - boolean propagateInput, IIndexLifecycleManagerProvider lcManagerProvider, - IStorageManager storageManager, IFileSplitProvider fileSplitProvider, - ISearchOperationCallbackFactory searchOpCallbackFactory, - boolean retainMissing, IMissingWriterFactory missingWriterFactory, - IPageManagerFactory pageManagerFactory) { - super(spec, 1, 1, outRecDesc, storageManager, lcManagerProvider, fileSplitProvider, - new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS, - FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS, - externalFilesIndexDataFlowHelperFactory, null, propagateInput, retainMissing, missingWriterFactory, - null, searchOpCallbackFactory, null, pageManagerFactory); + RecordDescriptor outRecDesc, IIndexDataflowHelperFactory dataflowHelperFactory, + ISearchOperationCallbackFactory searchOpCallbackFactory, int version) { + super(spec, 1, 1); + outRecDescs[0] = outRecDesc; + this.dataflowHelperFactory = dataflowHelperFactory; + this.searchOpCallbackFactory = searchOpCallbackFactory; + this.version = version; this.adapterFactory = adapterFactory; } @@ -68,10 +60,8 @@ public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorD final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) throws HyracksDataException { // Create a file index accessor to be used for files lookup operations - // Note that all file index accessors will use partition 0 since we only have 1 files index per NC final ExternalFileIndexAccessor snapshotAccessor = new ExternalFileIndexAccessor( - (ExternalBTreeDataflowHelper) dataflowHelperFactory.createIndexDataflowHelper(this, ctx, partition), - this); + dataflowHelperFactory.create(ctx, partition), searchOpCallbackFactory, version); return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { // The adapter that uses the file index along with the coming tuples to access files in HDFS private LookupAdapter<?> adapter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java index 22960e4..493d3ba 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java @@ -20,41 +20,38 @@ package org.apache.asterix.external.operators; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.common.IStorageManager; public class ExternalRTreeSearchOperatorDescriptor extends RTreeSearchOperatorDescriptor { private static final long serialVersionUID = 1L; + private final int version; - public ExternalRTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, - IStorageManager storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider, - IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, - IBinaryComparatorFactory[] comparatorFactories, int[] keyFields, - IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, boolean retainMissing, - IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchOpCallbackFactory, - IPageManagerFactory pageManagerFactory) { - super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, - comparatorFactories, keyFields, dataflowHelperFactory, retainInput, retainMissing, missingWriterFactory, - searchOpCallbackFactory, null, null, pageManagerFactory); + public ExternalRTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] keyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, int version) { + super(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, + retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, false); + this.version = version; } @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new ExternalRTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields); + return new ExternalRTreeSearchOperatorNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), keyFields, minFilterFieldIndexes, + maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory, + searchCallbackFactory, version); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java index 29c7242..4c02b16 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java @@ -23,23 +23,31 @@ import java.io.IOException; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelper; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.lsm.rtree.impls.ExternalRTree; import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorNodePushable; +import org.apache.hyracks.storage.common.ISearchOperationCallback; public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperatorNodePushable { - public ExternalRTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, - int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) throws HyracksDataException { - super(opDesc, ctx, partition, recordDescProvider, keyFields, false, null, null); + private final int version; + + public ExternalRTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, + RecordDescriptor inputRecDesc, int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + int version) throws HyracksDataException { + super(ctx, partition, inputRecDesc, keyFields, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, false); + this.version = version; } // We override this method to specify the searched version of the index @@ -48,7 +56,6 @@ public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperator writer.open(); accessor = new FrameTupleAccessor(inputRecDesc); indexHelper.open(); - ExternalRTreeDataflowHelper rTreeDataflowHelper = (ExternalRTreeDataflowHelper) indexHelper; index = indexHelper.getIndexInstance(); if (retainMissing) { int fieldCount = getFieldCount(); @@ -71,10 +78,11 @@ public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperator tb = new ArrayTupleBuilder(recordDesc.getFieldCount()); dos = tb.getDataOutput(); appender = new FrameTupleAppender(new VSizeFrame(ctx)); - ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory() - .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null); - // The next line is the reason we override this method - indexAccessor = rTreeIndex.createAccessor(searchCallback, rTreeDataflowHelper.getTargetVersion()); + ISearchOperationCallback searchCallback = + searchCallbackFactory.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null); + // The next line is the reason we override this method... + // The right thing to do would be to change the signature of createAccessor + indexAccessor = rTreeIndex.createAccessor(searchCallback, version); cursor = createCursor(); if (retainInput) { frameTuple = new FrameTupleReference(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java index 96a6848..93acb26 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java @@ -42,7 +42,7 @@ public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperat public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, IAdapterFactory dataSourceAdapterFactory) { super(spec, 0, 1); - recordDescriptors[0] = rDesc; + outRecDescs[0] = rDesc; this.adapterFactory = dataSourceAdapterFactory; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index 97e5511..df2869e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -56,7 +56,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, FeedRuntimeType subscriptionLocation) { super(spec, 1, 1); - this.recordDescriptors[0] = rDesc; + this.outRecDescs[0] = rDesc; this.outputType = atype; this.connectionId = feedConnectionId; this.feedPolicyProperties = feedPolicyProperties; @@ -83,7 +83,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato } public RecordDescriptor getRecordDescriptor() { - return recordDescriptors[0]; + return outRecDescs[0]; } public FeedRuntimeType getSubscriptionLocation() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 1dce8ee..3a06a2b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -74,7 +74,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator this.adaptorFactory = adapterFactory; this.adapterOutputType = adapterOutputType; this.policyAccessor = policyAccessor; - this.recordDescriptors[0] = rDesc; + this.outRecDescs[0] = rDesc; } public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, String adapterLibraryName, @@ -87,7 +87,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator this.adaptorConfiguration = primaryFeed.getAdapterConfiguration(); this.adapterOutputType = adapterOutputType; this.policyAccessor = policyAccessor; - this.recordDescriptors[0] = rDesc; + this.outRecDescs[0] = rDesc; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java index 6acaefb..0558ead 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java @@ -95,8 +95,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp */ public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId, - Map<String, String> feedPolicyProperties, String operationId, - FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) throws HyracksDataException { + Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) + throws HyracksDataException { this.ctx = ctx; this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator) .createPushRuntime(ctx, recordDescProvider, partition, nPartitions); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java index cffd303..74858ce 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java @@ -72,20 +72,17 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe **/ private final FeedRuntimeType runtimeType; - private final String operandId; - public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId, final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties, - final FeedRuntimeType runtimeType, final String operandId) { + final FeedRuntimeType runtimeType) { super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity()); this.feedConnectionId = feedConnectionId; this.feedPolicyProperties = feedPolicyProperties; if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) { - recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0]; + outRecDescs[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0]; } this.coreOperator = coreOperatorDescriptor; this.runtimeType = runtimeType; - this.operandId = operandId; } @Override @@ -96,11 +93,11 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe switch (runtimeType) { case COMPUTE: nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions, - coreOperator, feedConnectionId, feedPolicyProperties, operandId, this); + coreOperator, feedConnectionId, feedPolicyProperties, this); break; case STORE: nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions, - coreOperator, feedConnectionId, feedPolicyProperties, operandId, this); + coreOperator, feedConnectionId, feedPolicyProperties, this); break; default: throw new RuntimeDataException(ErrorCode.OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java index 87b92c2..662b543 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java @@ -79,8 +79,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper private final IHyracksTaskContext ctx; - private final String targetId; - private final VSizeFrame message; private final IRecordDescriptorProvider recordDescProvider; @@ -89,8 +87,8 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId, - Map<String, String> feedPolicyProperties, String targetId, - FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) throws HyracksDataException { + Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) + throws HyracksDataException { this.ctx = ctx; this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator) .createPushRuntime(ctx, recordDescProvider, partition, nPartitions); @@ -99,7 +97,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper this.connectionId = feedConnectionId; this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext() .getApplicationContext()).getActiveManager(); - this.targetId = targetId; this.message = new VSizeFrame(ctx); TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx); this.recordDescProvider = recordDescProvider; @@ -109,7 +106,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper @Override public void open() throws HyracksDataException { ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), - runtimeType.toString() + "." + targetId, partition); + runtimeType.toString() + "." + connectionId.getDatasetName(), partition); try { initializeNewFeedRuntime(runtimeId); insertOperator.open(); @@ -123,16 +120,15 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0)); insertOperator.setOutputFrameWriter(0, writer, recordDesc); if (insertOperator instanceof LSMInsertDeleteOperatorNodePushable) { - LSMInsertDeleteOperatorNodePushable indexOp = - (LSMInsertDeleteOperatorNodePushable) insertOperator; + LSMInsertDeleteOperatorNodePushable indexOp = (LSMInsertDeleteOperatorNodePushable) insertOperator; if (!indexOp.isPrimary()) { writer = insertOperator; return; } } if (policyAccessor.flowControlEnabled()) { - writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator, - policyAccessor, fta, feedManager.getFramePool()); + writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator, policyAccessor, fta, + feedManager.getFramePool()); } else { writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/IndexInfoOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/IndexInfoOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/IndexInfoOperatorDescriptor.java deleted file mode 100644 index 050cbcf..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/IndexInfoOperatorDescriptor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.operators; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor; -import org.apache.hyracks.storage.common.IStorageManager; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; - -/* - * This is a hack used to optain multiple index instances in a single operator and it is not actually used as an operator - */ -public class IndexInfoOperatorDescriptor implements IIndexOperatorDescriptor{ - - private static final long serialVersionUID = 1L; - private final IFileSplitProvider fileSplitProvider; - private final IStorageManager storageManager; - private final IIndexLifecycleManagerProvider lifecycleManagerProvider; - public IndexInfoOperatorDescriptor(IFileSplitProvider fileSplitProvider,IStorageManager storageManager, - IIndexLifecycleManagerProvider lifecycleManagerProvider){ - this.fileSplitProvider = fileSplitProvider; - this.lifecycleManagerProvider = lifecycleManagerProvider; - this.storageManager = storageManager; - } - - @Override - public ActivityId getActivityId() { - return null; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return null; - } - - @Override - public IFileSplitProvider getFileSplitProvider() { - return fileSplitProvider; - } - - @Override - public IStorageManager getStorageManager() { - return storageManager; - } - - @Override - public IIndexLifecycleManagerProvider getLifecycleManagerProvider() { - return lifecycleManagerProvider; - } - - @Override - public RecordDescriptor getRecordDescriptor() { - return null; - } - - @Override - public IIndexDataflowHelperFactory getIndexDataflowHelperFactory() { - return null; - } - - @Override - public boolean getRetainInput() { - return false; - } - - @Override - public ISearchOperationCallbackFactory getSearchOpCallbackFactory() { - return null; - } - - @Override - public IModificationOperationCallbackFactory getModificationOpCallbackFactory() { - return null; - } - - @Override - public ITupleFilterFactory getTupleFilterFactory() { - return null; - } - - @Override - public ILocalResourceFactoryProvider getLocalResourceFactoryProvider() { - return null; - } - - @Override - public boolean getRetainMissing() { - return false; - } - - @Override - public IMissingWriterFactory getMissingWriterFactory() { - return null; - } - - @Override - public IPageManagerFactory getPageManagerFactory() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index b794ee1..ec7de91 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -108,13 +108,13 @@ public class FeedUtils { public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, FileSplit[] feedLogFileSplits) throws HyracksDataException { return new FeedLogManager( - FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIOManager()).getFile()); + FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile()); } public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) throws HyracksDataException { return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), - 0, ctx.getIOManager()).getFile()); + 0, ctx.getIoManager()).getFile()); } public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index d929eda..192ee99 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -108,16 +108,16 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.utils.TupleUtils; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; -import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IIndexAccessor; -import org.apache.hyracks.storage.am.common.api.IIndexCursor; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; -import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IIndexAccessor; +import org.apache.hyracks.storage.common.IIndexCursor; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.MultiComparator; public class MetadataNode implements IMetadataNode { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IIndexDataflowHelperFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IIndexDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IIndexDataflowHelperFactoryProvider.java deleted file mode 100644 index 679d6eb..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IIndexDataflowHelperFactoryProvider.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.api; - -import java.util.Map; - -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -@FunctionalInterface -public interface IIndexDataflowHelperFactoryProvider { - - /** - * Get the index dataflow helper factory - * - * @param mdProvider - * the system's metadata provider - * @param dataset - * the index dataset - * @param index - * the index - * @param recordType - * the dataset's record type - * @param metaType - * the detaset's meta type - * @param mergePolicyFactory - * the index's merge policy factory - * @param mergePolicyProperties - * the index's merge policy properties - * @param filterTypeTraits - * the dataset's filter type traits - * @param filterCmpFactories - * the dataset's filter comparator factories - * @return the index dataflow helper factory - * @throws AlgebricksException - * if the dataflow helper factory couldn't be created for the index - */ - IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset, Index index, - ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IResourceFactoryProvider.java new file mode 100644 index 0000000..36660c0 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IResourceFactoryProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.api; + +import java.util.Map; + +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.common.IResourceFactory; + +@FunctionalInterface +public interface IResourceFactoryProvider { + + /** + * Get the index resource factory + * + * @param mdProvider + * the system's metadata provider + * @param dataset + * the index dataset + * @param index + * the index + * @param recordType + * the dataset's record type + * @param metaType + * the detaset's meta type + * @param mergePolicyFactory + * the index's merge policy factory + * @param mergePolicyProperties + * the index's merge policy properties + * @param filterTypeTraits + * the dataset's filter type traits + * @param filterCmpFactories + * the dataset's filter comparator factories + * @return the index dataflow helper factory + * @throws AlgebricksException + * if the dataflow helper factory couldn't be created for the index + */ + IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 3170a68..272cced 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -26,17 +26,18 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.transactions.Resource; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.api.IAdapterFactory; @@ -68,8 +69,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; -import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -77,9 +77,10 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; -import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; -import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil; +import org.apache.hyracks.storage.am.common.api.IIndexBuilder; +import org.apache.hyracks.storage.am.common.build.IndexBuilder; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; @@ -87,12 +88,10 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory; +import org.apache.hyracks.storage.common.ILocalResourceRepository; +import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceFactory; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.LocalResource; /** * Initializes the remote metadata storage facilities ("universe") using a @@ -335,47 +334,38 @@ public class MetadataBootstrap { ClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId()); String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath(); FileReference file = ioManager.getFileReference(metadataDeviceId, resourceName); - + index.setFile(file); // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for // a dataset that was not yet created List<IVirtualBufferCache> virtualBufferCaches = appContext.getDatasetLifecycleManager() .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum()); ITypeTraits[] typeTraits = index.getTypeTraits(); - IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory(); + IBinaryComparatorFactory[] cmpFactories = index.getKeyBinaryComparatorFactory(); int[] bloomFilterKeyFields = index.getBloomFilterKeyFields(); - LSMBTree lsmBtree; - long resourceID; + // opTrackerProvider and ioOpCallbackFactory should both be acquired through IStorageManager // We are unable to do this since IStorageManager needs a dataset to determine the appropriate // objects - ILSMOperationTrackerFactory opTrackerProvider = + ILSMOperationTrackerFactory opTrackerFactory = index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(index.getDatasetId().getId()) : new SecondaryIndexOperationTrackerFactory(index.getDatasetId().getId()); ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE; + IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider(); if (isNewUniverse()) { + LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory( + storageComponentProvider.getStorageManager(), typeTraits, cmpFactories, null, null, null, + opTrackerFactory, ioOpCallbackFactory, storageComponentProvider.getMetadataPageManagerFactory(), + new AsterixVirtualBufferCacheProvider(index.getDatasetId().getId()), + storageComponentProvider.getIoOperationSchedulerProvider(), + appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, + bloomFilterKeyFields, appContext.getBloomFilterFalsePositiveRate(), true, null); + DatasetLocalResourceFactory dsLocalResourceFactory = + new DatasetLocalResourceFactory(index.getDatasetId().getId(), lsmBtreeFactory); // TODO(amoudi) Creating the index should be done through the same code path as other indexes // This is to be done by having a metadata dataset associated with each index - lsmBtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, fileMapProvider, - typeTraits, comparatorFactories, bloomFilterKeyFields, - appContext.getBloomFilterFalsePositiveRate(), - appContext.getMetadataMergePolicyFactory().createMergePolicy( - GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), - opTrackerProvider.getOperationTracker(ncServiceCtx), appContext.getLSMIOScheduler(), - ioOpCallbackFactory.createIoOpCallback(), index.isPrimaryIndex(), null, null, null, null, true, - appContext.getStorageComponentProvider().getMetadataPageManagerFactory()); - lsmBtree.create(); - resourceID = index.getResourceId(); - Resource localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits, comparatorFactories, - bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(), - metadataPartition.getPartitionId(), appContext.getMetadataMergePolicyFactory(), - GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, null, null, null, null, opTrackerProvider, - ioOpCallbackFactory, appContext.getStorageComponentProvider().getMetadataPageManagerFactory()); - ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - partition -> localResourceMetadata, LocalResource.LSMBTreeResource); - ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory(); - localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName, - ITreeIndexFrame.Constants.VERSION, metadataPartition.getPartitionId())); - dataLifecycleManager.register(file.getRelativePath(), lsmBtree); + IIndexBuilder indexBuilder = new IndexBuilder(ncServiceCtx, storageComponentProvider.getStorageManager(), + index::getResourceId, file, dsLocalResourceFactory, true); + indexBuilder.build(); } else { final LocalResource resource = localResourceRepository.get(file.getRelativePath()); if (resource == null) { @@ -384,26 +374,14 @@ public class MetadataBootstrap { .get(appContext.getTransactionSubsystem().getId()) + " to intialize as a new instance. (WARNING: all data will be lost.)"); } - resourceID = resource.getId(); + // Why do we care about metadata dataset's resource ids? why not assign them ids similar to other resources? if (index.getResourceId() != resource.getId()) { throw new HyracksDataException("Resource Id doesn't match expected metadata index resource id"); } - lsmBtree = (LSMBTree) dataLifecycleManager.get(file.getRelativePath()); - if (lsmBtree == null) { - lsmBtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, - fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields, - appContext.getBloomFilterFalsePositiveRate(), - appContext.getMetadataMergePolicyFactory().createMergePolicy( - GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), - opTrackerProvider.getOperationTracker(ncServiceCtx), appContext.getLSMIOScheduler(), - LSMBTreeIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), index.isPrimaryIndex(), null, - null, null, null, true, - appContext.getStorageComponentProvider().getMetadataPageManagerFactory()); - dataLifecycleManager.register(file.getRelativePath(), lsmBtree); - } + IndexDataflowHelper indexHelper = new IndexDataflowHelper(ncServiceCtx, storageComponentProvider.getStorageManager(), file); + indexHelper.open(); // Opening the index through the helper will ensure it gets instantiated + indexHelper.close(); } - index.setResourceId(resourceID); - index.setFile(file); } public static String getOutputDir() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java deleted file mode 100644 index f3913e1..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.declared; - -import java.util.Map; - -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; -import org.apache.asterix.metadata.utils.IndexUtil; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -public class BTreeDataflowHelperFactoryProvider implements IIndexDataflowHelperFactoryProvider { - - public static final BTreeDataflowHelperFactoryProvider INSTANCE = new BTreeDataflowHelperFactoryProvider(); - - private BTreeDataflowHelperFactoryProvider() { - } - - public static String externalFileIndexName(Dataset dataset) { - return dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX); - } - - @Override - public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset, - Index index, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { - int[] filterFields = IndexUtil.getFilterFields(dataset, index, filterTypeTraits); - int[] btreeFields = IndexUtil.getBtreeFieldsIfFiltered(dataset, index); - IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); - switch (dataset.getDatasetType()) { - case EXTERNAL: - return index.getIndexName().equals(externalFileIndexName(dataset)) - ? new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyProperties, - dataset.getIndexOperationTrackerFactory(index), - storageComponentProvider.getIoOperationSchedulerProvider(), - dataset.getIoOperationCallbackFactory(index), - mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, mdProvider), - !dataset.getDatasetDetails().isTemp()) - : new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyProperties, - dataset.getIndexOperationTrackerFactory(index), - storageComponentProvider.getIoOperationSchedulerProvider(), - dataset.getIoOperationCallbackFactory(index), - mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), - new int[] { index.getKeyFieldNames().size() }, - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, mdProvider), - !dataset.getDatasetDetails().isTemp()); - case INTERNAL: - return new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index), - storageComponentProvider.getIoOperationSchedulerProvider(), - dataset.getIoOperationCallbackFactory(index), - mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), index.isPrimaryIndex(), - filterTypeTraits, filterCmpFactories, btreeFields, filterFields, - !dataset.getDatasetDetails().isTemp()); - default: - throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE, - dataset.getDatasetType().toString()); - } - } -}