http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java deleted file mode 100644 index 4b28d42..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.dataflow; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexInsertUpdateDeleteOperator; -import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; -import org.apache.hyracks.storage.common.IStorageManager; - -public class LSMInvertedIndexInsertDeleteOperatorDescriptor extends LSMInvertedIndexInsertUpdateDeleteOperator { - - private static final long serialVersionUID = 1L; - - private final String indexName; - - public LSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, - IStorageManager storageManager, IFileSplitProvider fileSplitProvider, - IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits, - IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits, - IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory, - int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory, - ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, - ISearchOperationCallbackFactory searchCallbackFactory, String indexName, - IPageManagerFactory pageManagerFactory) { - super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits, - tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, - fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory, - searchCallbackFactory, pageManagerFactory); - this.indexName = indexName; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new LSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation, recordDescProvider, op, - false); - } - - public String getIndexName() { - return indexName; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java index b9e7c23..6d58f6d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java @@ -20,74 +20,38 @@ package org.apache.asterix.common.dataflow; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor; -import org.apache.hyracks.storage.common.IStorageManager; public class LSMTreeInsertDeleteOperatorDescriptor extends LSMTreeIndexInsertUpdateDeleteOperatorDescriptor { private static final long serialVersionUID = 1L; - private final boolean isPrimary; - /** the name of the index that is being operated upon **/ - private final String indexName; - - public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, - IStorageManager storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider, - IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, - IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation, - IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory, - ITupleFilterFactory tupleFilterFactory, boolean isPrimary, String indexName, - IMissingWriterFactory nullWriterFactory, - IModificationOperationCallbackFactory modificationOpCallbackProvider, - ISearchOperationCallbackFactory searchOpCallbackProvider, IPageManagerFactory pageManagerFactory) { - super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory, - tupleFilterFactory, nullWriterFactory, modificationOpCallbackProvider, searchOpCallbackProvider, - pageManagerFactory); + public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory, + ITupleFilterFactory tupleFilterFactory, boolean isPrimary, + IModificationOperationCallbackFactory modCallbackFactory) { + super(spec, outRecDesc, indexHelperFactory, fieldPermutation, op, modCallbackFactory, tupleFilterFactory); this.isPrimary = isPrimary; - this.indexName = indexName; } @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new LSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation, recordDescProvider, op, - isPrimary); + RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new LSMInsertDeleteOperatorNodePushable(ctx, partition, fieldPermutation, inputRecDesc, op, isPrimary, + indexHelperFactory, modCallbackFactory, tupleFilterFactory); } public boolean isPrimary() { return isPrimary; } - - public String getIndexName() { - return indexName; - } - - public int[] getFieldPermutations() { - return fieldPermutation; - } - - public IndexOperation getIndexOperation() { - return op; - } - - public IBinaryComparatorFactory[] getComparatorFactories() { - return comparatorFactories; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index aeed05d..2a6efd5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -55,6 +55,8 @@ public class ErrorCode { public static final int COERCION = 12; public static final int DUPLICATE_FIELD_NAME = 13; public static final int PROPERTY_NOT_SET = 14; + public static final int ROOT_LOCAL_RESOURCE_EXISTS = 15; + public static final int ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED = 16; public static final int INSTANTIATION_ERROR = 100; // Compilation errors http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java index 311400f..a0e6e71 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java @@ -18,6 +18,18 @@ */ package org.apache.asterix.common.metadata; +import java.util.List; + public interface IDataset { + /** + * @return the list of primary keys for the dataset + */ + List<List<String>> getPrimaryKeys(); + + /** + * @return the bloom filter fields indexes for the primary index of the dataset + */ + int[] getPrimaryBloomFilterFields(); + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java index c27d2de..f4b638f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java @@ -24,9 +24,9 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; public interface IAppRuntimeContextProvider { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceFactory.java deleted file mode 100644 index d569855..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -import java.io.Serializable; - -@FunctionalInterface -public interface IResourceFactory extends Serializable { - /** - * Create a serializable resource for the task partition - * @param partition - * Hyracks task partition - * @return the serializable resource - */ - Resource resource(int partition); -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java deleted file mode 100644 index 81cb58a..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -import java.io.Serializable; -import java.util.List; - -import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.api.io.IODeviceHandle; -import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; -import org.apache.hyracks.storage.common.file.LocalResource; - -/** - * TODO(amoudi): Change this class and its subclasses to use json serialization instead of Java serialization - * The base resource that will be written to disk. it will go in the serializable resource - * member in {@link LocalResource} - */ -public abstract class Resource implements Serializable { - - private static final long serialVersionUID = 1L; - private final int datasetId; - private final int partition; - protected final ITypeTraits[] filterTypeTraits; - protected final IBinaryComparatorFactory[] filterCmpFactories; - protected final int[] filterFields; - protected final ILSMOperationTrackerFactory opTrackerProvider; - protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory; - protected final IMetadataPageManagerFactory metadataPageManagerFactory; - - public Resource(int datasetId, int partition, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields, - ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory, - IMetadataPageManagerFactory metadataPageManagerFactory) { - this.datasetId = datasetId; - this.partition = partition; - this.filterTypeTraits = filterTypeTraits; - this.filterCmpFactories = filterCmpFactories; - this.filterFields = filterFields; - this.opTrackerProvider = opTrackerProvider; - this.ioOpCallbackFactory = ioOpCallbackFactory; - this.metadataPageManagerFactory = metadataPageManagerFactory; - } - - public int partition() { - return partition; - } - - public int datasetId() { - return datasetId; - } - - public abstract ILSMIndex createIndexInstance(INCServiceContext ncServiceCtx, LocalResource resource) - throws HyracksDataException; - - public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) { - List<IODeviceHandle> ioDevices = ioManager.getIODevices(); - for (int i = 0; i < ioDevices.size(); i++) { - IODeviceHandle device = ioDevices.get(i); - if (device == deviceHandle) { - return i; - } - } - return -1; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ResourceFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ResourceFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ResourceFactory.java deleted file mode 100644 index 891edd8..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ResourceFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; - -public abstract class ResourceFactory implements IResourceFactory { - - private static final long serialVersionUID = 1L; - protected final int datasetId; - protected final ITypeTraits[] filterTypeTraits; - protected final IBinaryComparatorFactory[] filterCmpFactories; - protected final int[] filterFields; - protected final ILSMOperationTrackerFactory opTrackerProvider; - protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory; - protected final IMetadataPageManagerFactory metadataPageManagerFactory; - - public ResourceFactory(int datasetId, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields, - ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory, - IMetadataPageManagerFactory metadataPageManagerFactory) { - this.datasetId = datasetId; - this.filterTypeTraits = filterTypeTraits; - this.filterCmpFactories = filterCmpFactories; - this.filterFields = filterFields; - this.opTrackerProvider = opTrackerProvider; - this.ioOpCallbackFactory = ioOpCallbackFactory; - this.metadataPageManagerFactory = metadataPageManagerFactory; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 46cd476..1aa1474 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -77,6 +77,12 @@ public class StoragePathUtil { return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length())); } + public static int getPartitionNumFromRelativePath(String relativePath) { + int startIdx = relativePath.indexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length(); + String partition = relativePath.substring(startIdx, relativePath.indexOf(File.separatorChar, startIdx)); + return Integer.parseInt(partition); + } + /** * @param fileAbsolutePath * @return the file relative path starting from the partition directory http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 1387c6f..9b2ad8a 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -48,6 +48,8 @@ 11 = Index out of bound in %1$s: %2$s 12 = Invalid implicit scalar to collection coercion in %1$s 14 = Property %1$s not set +15 = Storage metadata directory of %1$s in %2$s already exists +16 = Storage metadata directory of %1$s in %2$s couldn't be created 100 = Unable to instantiate class %1$s # Compile-time check errors http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java index cdb40d8..c11fb61 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java @@ -25,7 +25,6 @@ import java.util.Date; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.AInt64; @@ -41,12 +40,13 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.btree.util.BTreeUtils; -import org.apache.hyracks.storage.am.common.api.IIndexCursor; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback; -import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelper; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.common.IIndexCursor; +import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.storage.common.MultiComparator; /* * This class was created specifically to facilitate accessing @@ -54,11 +54,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; */ @SuppressWarnings({ "rawtypes", "unchecked" }) public class ExternalFileIndexAccessor { - - private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - private ExternalBTreeDataflowHelper indexDataflowHelper; - private ExternalLookupOperatorDescriptor opDesc; - + private IIndexDataflowHelper indexDataflowHelper; private IHyracksTaskContext ctx; private ExternalBTree index; private ArrayTupleBuilder searchKeyTupleBuilder; @@ -69,11 +65,15 @@ public class ExternalFileIndexAccessor { private RangePredicate searchPredicate; private ILSMIndexAccessor fileIndexAccessor; private IIndexCursor fileIndexSearchCursor; + private ISearchOperationCallbackFactory searchCallbackFactory; + private int version; + private ISerializerDeserializer externalFileRecordSerde = FilesIndexDescription.createExternalFileRecordSerde(); - public ExternalFileIndexAccessor(ExternalBTreeDataflowHelper indexDataflowHelper, - ExternalLookupOperatorDescriptor opDesc) { + public ExternalFileIndexAccessor(IIndexDataflowHelper indexDataflowHelper, + ISearchOperationCallbackFactory searchCallbackFactory, int version) { this.indexDataflowHelper = indexDataflowHelper; - this.opDesc = opDesc; + this.searchCallbackFactory = searchCallbackFactory; + this.version = version; } public void open() throws HyracksDataException { @@ -90,9 +90,9 @@ public class ExternalFileIndexAccessor { searchPredicate = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp); // create the accessor and the cursor using the passed version - ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory() + ISearchOperationCallback searchCallback = searchCallbackFactory .createSearchOperationCallback(indexDataflowHelper.getResource().getId(), ctx, null); - fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion()); + fileIndexAccessor = index.createAccessor(searchCallback, version); fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false); } @@ -115,7 +115,7 @@ public class ExternalFileIndexAccessor { int recordLength = tuple.getFieldLength(FilesIndexDescription.FILE_PAYLOAD_INDEX); ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength); DataInput in = new DataInputStream(stream); - ARecord externalFileRecord = (ARecord) filesIndexDescription.EXTERNAL_FILE_RECORD_SERDE.deserialize(in); + ARecord externalFileRecord = (ARecord) externalFileRecordSerde.deserialize(in); setFile(externalFileRecord, file); } else { // This should never happen http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java index 970b02a..2597782 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java @@ -35,33 +35,30 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @SuppressWarnings("unchecked") public class FileIndexTupleTranslator { - private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder( - filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount()); + private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(FilesIndexDescription.FILE_INDEX_TUPLE_SIZE); private RecordBuilder recordBuilder = new RecordBuilder(); private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); private AMutableInt32 aInt32 = new AMutableInt32(0); private AMutableInt64 aInt64 = new AMutableInt64(0); private AMutableString aString = new AMutableString(null); private AMutableDateTime aDateTime = new AMutableDateTime(0); - private ISerializerDeserializer<IAObject> stringSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ASTRING); - private ISerializerDeserializer<IAObject> dateTimeSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ADATETIME); - private ISerializerDeserializer<IAObject> longSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT64); + private ISerializerDeserializer<IAObject> stringSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + private ISerializerDeserializer<IAObject> dateTimeSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME); + private ISerializerDeserializer<IAObject> longSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); private ArrayTupleReference tuple = new ArrayTupleReference(); public ITupleReference getTupleFromFile(ExternalFile file) throws HyracksDataException { tupleBuilder.reset(); //File Number aInt32.setValue(file.getFileNumber()); - filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32, - tupleBuilder.getDataOutput()); + FilesIndexDescription.FILE_NUMBER_SERDE.serialize(aInt32, tupleBuilder.getDataOutput()); tupleBuilder.addFieldEndOffset(); //File Record - recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE); + recordBuilder.reset(FilesIndexDescription.EXTERNAL_FILE_RECORD_TYPE); // write field 0 (File Name) fieldValue.reset(); aString.setValue(file.getFileName()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java index 152fa8b..45ccfa0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java @@ -42,57 +42,50 @@ public class FilesIndexDescription { public static final int FILE_KEY_SIZE = 1; public static final int FILE_PAYLOAD_INDEX = 1; private static final String[] payloadFieldNames = { "FileName", "FileSize", "FileModDate" }; - private static final IAType[] payloadFieldTypes = { BuiltinType.ASTRING, BuiltinType.AINT64, - BuiltinType.ADATETIME }; + private static final IAType[] payloadFieldTypes = + { BuiltinType.ASTRING, BuiltinType.AINT64, BuiltinType.ADATETIME }; public static final int[] BLOOM_FILTER_FIELDS = { 0 }; public static final int EXTERNAL_FILE_NAME_FIELD_INDEX = 0; public static final int EXTERNAL_FILE_SIZE_FIELD_INDEX = 1; public static final int EXTERNAL_FILE_MOD_DATE_FIELD_INDEX = 2; - public final ARecordType EXTERNAL_FILE_RECORD_TYPE; - public final ITypeTraits[] EXTERNAL_FILE_BUDDY_BTREE_TYPE_TRAITS = new ITypeTraits[1]; - public final ITypeTraits[] EXTERNAL_FILE_INDEX_TYPE_TRAITS = new ITypeTraits[FILE_INDEX_TUPLE_SIZE]; - - public final ISerializerDeserializer EXTERNAL_FILE_RECORD_SERDE; - public final RecordDescriptor FILE_INDEX_RECORD_DESCRIPTOR; - public final RecordDescriptor FILE_BUDDY_BTREE_RECORD_DESCRIPTOR; - public final ISerializerDeserializer[] EXTERNAL_FILE_BUDDY_BTREE_FIELDS = new ISerializerDeserializer[1]; - public final ISerializerDeserializer[] EXTERNAL_FILE_TUPLE_FIELDS = new ISerializerDeserializer[FILE_INDEX_TUPLE_SIZE]; + public static final ARecordType EXTERNAL_FILE_RECORD_TYPE = + new ARecordType("ExternalFileRecordType", payloadFieldNames, payloadFieldTypes, true); + public static final ITypeTraits[] EXTERNAL_FILE_BUDDY_BTREE_TYPE_TRAITS = + new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(IndexingConstants.FILE_NUMBER_FIELD_TYPE) }; + public static final ITypeTraits[] EXTERNAL_FILE_INDEX_TYPE_TRAITS = + new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(IndexingConstants.FILE_NUMBER_FIELD_TYPE), + TypeTraitProvider.INSTANCE.getTypeTrait(EXTERNAL_FILE_RECORD_TYPE) }; public static final IBinaryComparatorFactory[] FILES_INDEX_COMP_FACTORIES = new IBinaryComparatorFactory[] { BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true) }; + public static final ISerializerDeserializer FILE_NUMBER_SERDE = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(IndexingConstants.FILE_NUMBER_FIELD_TYPE); + public static final ISerializerDeserializer[] EXTERNAL_FILE_BUDDY_BTREE_FIELDS = + new ISerializerDeserializer[] { FILE_NUMBER_SERDE }; + public static final RecordDescriptor FILE_BUDDY_BTREE_RECORD_DESCRIPTOR = + new RecordDescriptor(EXTERNAL_FILE_BUDDY_BTREE_FIELDS, EXTERNAL_FILE_BUDDY_BTREE_TYPE_TRAITS); - public FilesIndexDescription() { - ARecordType type = new ARecordType("ExternalFileRecordType", payloadFieldNames, payloadFieldTypes, true); - EXTERNAL_FILE_RECORD_TYPE = type; - EXTERNAL_FILE_INDEX_TYPE_TRAITS[FILE_KEY_INDEX] = TypeTraitProvider.INSTANCE - .getTypeTrait(IndexingConstants.FILE_NUMBER_FIELD_TYPE); - EXTERNAL_FILE_INDEX_TYPE_TRAITS[FILE_PAYLOAD_INDEX] = TypeTraitProvider.INSTANCE - .getTypeTrait(EXTERNAL_FILE_RECORD_TYPE); - EXTERNAL_FILE_BUDDY_BTREE_TYPE_TRAITS[FILE_KEY_INDEX] = TypeTraitProvider.INSTANCE - .getTypeTrait(IndexingConstants.FILE_NUMBER_FIELD_TYPE); - - EXTERNAL_FILE_RECORD_SERDE = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(EXTERNAL_FILE_RECORD_TYPE); + private FilesIndexDescription() { + } - EXTERNAL_FILE_TUPLE_FIELDS[FILE_KEY_INDEX] = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(IndexingConstants.FILE_NUMBER_FIELD_TYPE); - EXTERNAL_FILE_TUPLE_FIELDS[FILE_PAYLOAD_INDEX] = EXTERNAL_FILE_RECORD_SERDE; - EXTERNAL_FILE_BUDDY_BTREE_FIELDS[FILE_KEY_INDEX] = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(IndexingConstants.FILE_NUMBER_FIELD_TYPE); + public static ISerializerDeserializer createExternalFileRecordSerde() { + return SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(EXTERNAL_FILE_RECORD_TYPE); + } - FILE_INDEX_RECORD_DESCRIPTOR = new RecordDescriptor(EXTERNAL_FILE_TUPLE_FIELDS, - EXTERNAL_FILE_INDEX_TYPE_TRAITS); + public static ISerializerDeserializer[] createExternalFileTupleFieldsSerdes() { + return new ISerializerDeserializer[] { FILE_NUMBER_SERDE, createExternalFileRecordSerde() }; + } - FILE_BUDDY_BTREE_RECORD_DESCRIPTOR = new RecordDescriptor(EXTERNAL_FILE_BUDDY_BTREE_FIELDS, - EXTERNAL_FILE_BUDDY_BTREE_TYPE_TRAITS); + public static RecordDescriptor createFileIndexRecordDescriptor() { + return new RecordDescriptor(createExternalFileTupleFieldsSerdes(), EXTERNAL_FILE_INDEX_TYPE_TRAITS); } @SuppressWarnings("unchecked") - public void getBuddyBTreeTupleFromFileNumber(ArrayTupleReference tuple, ArrayTupleBuilder tupleBuilder, + public static void getBuddyBTreeTupleFromFileNumber(ArrayTupleReference tuple, ArrayTupleBuilder tupleBuilder, AMutableInt32 aInt32) throws IOException, AsterixException { tupleBuilder.reset(); - FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32, tupleBuilder.getDataOutput()); + FILE_NUMBER_SERDE.serialize(aInt32, tupleBuilder.getDataOutput()); tupleBuilder.addFieldEndOffset(); tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java index 86f5aa5..e26b925 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java @@ -50,17 +50,17 @@ public class IndexingConstants { public static final IAType ROW_NUMBER_FIELD_TYPE = BuiltinType.AINT32; //Comparator Factories - private static final IBinaryComparatorFactory fileNumberCompFactory = BinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(BuiltinType.AINT32, true); - private static final IBinaryComparatorFactory recordOffsetCompFactory = BinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(BuiltinType.AINT64, true); - private static final IBinaryComparatorFactory rowNumberCompFactory = BinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(BuiltinType.AINT32, true); - - private static final IBinaryComparatorFactory[] rCFileRIDComparatorFactories = { fileNumberCompFactory, - recordOffsetCompFactory, rowNumberCompFactory }; - private static final IBinaryComparatorFactory[] txtSeqFileRIDComparatorFactories = { fileNumberCompFactory, - recordOffsetCompFactory }; + private static final IBinaryComparatorFactory fileNumberCompFactory = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true); + private static final IBinaryComparatorFactory recordOffsetCompFactory = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true); + private static final IBinaryComparatorFactory rowNumberCompFactory = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true); + + private static final IBinaryComparatorFactory[] rCFileRIDComparatorFactories = + { fileNumberCompFactory, recordOffsetCompFactory, rowNumberCompFactory }; + private static final IBinaryComparatorFactory[] txtSeqFileRIDComparatorFactories = + { fileNumberCompFactory, recordOffsetCompFactory }; private static final IBinaryComparatorFactory[] buddyBtreeComparatorFactories = { fileNumberCompFactory }; @@ -86,12 +86,12 @@ public class IndexingConstants { static { - fileNumberSerializerDeserializer = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(FILE_NUMBER_FIELD_TYPE); - recordOffsetSerializerDeserializer = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(RECORD_OFFSET_FIELD_TYPE); - rowNumberSerializerDeserializer = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(ROW_NUMBER_FIELD_TYPE); + fileNumberSerializerDeserializer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(FILE_NUMBER_FIELD_TYPE); + recordOffsetSerializerDeserializer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(RECORD_OFFSET_FIELD_TYPE); + rowNumberSerializerDeserializer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(ROW_NUMBER_FIELD_TYPE); fileNumberTypeTraits = TypeTraitProvider.INSTANCE.getTypeTrait(FILE_NUMBER_FIELD_TYPE); recordOffsetTypeTraits = TypeTraitProvider.INSTANCE.getTypeTrait(RECORD_OFFSET_FIELD_TYPE); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java index d9471d1..95debe3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java @@ -26,11 +26,10 @@ import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; // This is an operator that takes a single file index and an array of secondary indexes @@ -42,25 +41,17 @@ public abstract class AbstractExternalDatasetIndexesOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; - private IIndexDataflowHelperFactory filesIndexDataflowHelperFactory; - private IndexInfoOperatorDescriptor fileIndexInfo; private List<IIndexDataflowHelperFactory> treeIndexesDataflowHelperFactories; - private List<IndexInfoOperatorDescriptor> treeIndexesInfos; public AbstractExternalDatasetIndexesOperatorDescriptor(IOperatorDescriptorRegistry spec, - IIndexDataflowHelperFactory filesIndexDataflowHelperFactory, IndexInfoOperatorDescriptor fileIndexesInfo, - List<IIndexDataflowHelperFactory> treeIndexesDataflowHelperFactories, - List<IndexInfoOperatorDescriptor> indexesInfos) { + List<IIndexDataflowHelperFactory> treeIndexesDataflowHelperFactories) { super(spec, 0, 0); - this.filesIndexDataflowHelperFactory = filesIndexDataflowHelperFactory; - this.fileIndexInfo = fileIndexesInfo; this.treeIndexesDataflowHelperFactories = treeIndexesDataflowHelperFactories; - this.treeIndexesInfos = indexesInfos; } // opening and closing the index is done inside these methods since we don't always need open indexes - protected abstract void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, - IHyracksTaskContext ctx, IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception; + protected abstract void performOpOnIndex(IIndexDataflowHelper indexDataflowHelper, IHyracksTaskContext ctx) + throws HyracksDataException; @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, @@ -70,16 +61,11 @@ public abstract class AbstractExternalDatasetIndexesOperatorDescriptor @Override public void initialize() throws HyracksDataException { try { - FileSplit fileSplit = fileIndexInfo.getFileSplitProvider().getFileSplits()[partition]; - FileReference fileRef = fileSplit.getFileReference(ctx.getIOManager()); - // only in partition of device id = 0, we perform the operation on the files index - if (fileRef.getDeviceHandle() == ctx.getIOManager().getIODevices().get(0)) { - performOpOnIndex(filesIndexDataflowHelperFactory, ctx, fileIndexInfo, partition); - } // perform operation on btrees for (int i = 0; i < treeIndexesDataflowHelperFactories.size(); i++) { - performOpOnIndex(treeIndexesDataflowHelperFactories.get(i), ctx, treeIndexesInfos.get(i), - partition); + IIndexDataflowHelper indexHelper = + treeIndexesDataflowHelperFactories.get(i).create(ctx, partition); + performOpOnIndex(indexHelper, ctx); } } catch (Exception e) { throw new HyracksDataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java index ac715a0..5ecd283 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java @@ -19,45 +19,38 @@ package org.apache.asterix.external.operators; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.common.IStorageManager; public class ExternalBTreeSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor { private static final long serialVersionUID = 1L; + private final int version; - public ExternalBTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, - IStorageManager storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider, - IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, - IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] lowKeyFields, - int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, - IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, boolean retainMissing, - IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchOpCallbackProvider, - IPageManagerFactory pageManagerFactory) { - super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, - comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, lowKeyInclusive, - highKeyInclusive, dataflowHelperFactory, retainInput, retainMissing, missingWriterFactory, - searchOpCallbackProvider, null, null, pageManagerFactory); + public ExternalBTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, + IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, int version) { + super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, + retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, false); + this.version = version; } @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + public ExternalBTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new ExternalBTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields, - highKeyFields, lowKeyInclusive, highKeyInclusive); + return new ExternalBTreeSearchOperatorNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields, + lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, missingWriterFactory, + searchCallbackFactory, version); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java index 6e3b5d7..a966bbe 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java @@ -23,33 +23,39 @@ import java.io.IOException; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelper; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTreeWithBuddy; +import org.apache.hyracks.storage.common.ISearchOperationCallback; public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable { - public ExternalBTreeSearchOperatorNodePushable(ExternalBTreeSearchOperatorDescriptor opDesc, - IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, int[] lowKeyFields, - int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws HyracksDataException { - super(opDesc, ctx, partition, recordDescProvider, lowKeyFields, highKeyFields, lowKeyInclusive, - highKeyInclusive, false, null, null); + private final int version; + + public ExternalBTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, + RecordDescriptor intputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, + boolean highKeyInclusive, IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, + boolean retainMissing, IMissingWriterFactory missingWriterFactory, + ISearchOperationCallbackFactory searchCallbackFactory, int version) throws HyracksDataException { + super(ctx, partition, intputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, null, null, + indexHelperFactory, retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, false); + this.version = version; } // We override the open function to search a specific version of the index @Override public void open() throws HyracksDataException { writer.open(); - ExternalBTreeWithBuddyDataflowHelper dataFlowHelper = (ExternalBTreeWithBuddyDataflowHelper) indexHelper; accessor = new FrameTupleAccessor(inputRecDesc); - dataFlowHelper.open(); + indexHelper.open(); index = indexHelper.getIndexInstance(); if (retainMissing) { int fieldCount = getFieldCount(); @@ -72,10 +78,10 @@ public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperator tb = new ArrayTupleBuilder(recordDesc.getFieldCount()); dos = tb.getDataOutput(); appender = new FrameTupleAppender(new VSizeFrame(ctx)); - ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory() - .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null); + ISearchOperationCallback searchCallback = + searchCallbackFactory.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null); // The next line is the reason we override this method - indexAccessor = externalIndex.createAccessor(searchCallback, dataFlowHelper.getTargetVersion()); + indexAccessor = externalIndex.createAccessor(searchCallback, version); cursor = createCursor(); if (retainInput) { frameTuple = new FrameTupleReference(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java index 3ce0da8..20f0c55 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java @@ -21,10 +21,12 @@ package org.apache.asterix.external.operators; import java.util.List; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager; public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor { @@ -32,18 +34,17 @@ public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExter private static final long serialVersionUID = 1L; public ExternalDatasetIndexesAbortOperatorDescriptor(IOperatorDescriptorRegistry spec, - IIndexDataflowHelperFactory filesIndexDataflowHelperFactory, IndexInfoOperatorDescriptor fileIndexesInfo, - List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories, - List<IndexInfoOperatorDescriptor> indexesInfos) { - super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, indexesDataflowHelperFactories, - indexesInfos); + List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories) { + super(spec, indexesDataflowHelperFactories); } @Override - protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx, - IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception { - FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager()); - AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIOManager(), file); + protected void performOpOnIndex(IIndexDataflowHelper indexDataflowHelper, IHyracksTaskContext ctx) + throws HyracksDataException { + String path = indexDataflowHelper.getResource().getPath(); + IIOManager ioManager = ctx.getIoManager(); + FileReference file = ioManager.resolve(path); + AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIoManager(), file); fileManager.deleteTransactionFiles(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java index 1559469..fe4f93c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java @@ -23,12 +23,12 @@ import java.util.List; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; +import org.apache.hyracks.storage.common.IIndex; import org.apache.log4j.Logger; public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor { @@ -37,26 +37,21 @@ public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExte Logger.getLogger(ExternalDatasetIndexesCommitOperatorDescriptor.class.getName()); public ExternalDatasetIndexesCommitOperatorDescriptor(IOperatorDescriptorRegistry spec, - IIndexDataflowHelperFactory filesIndexDataflowHelperFactory, IndexInfoOperatorDescriptor fileIndexesInfo, - List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories, - List<IndexInfoOperatorDescriptor> indexesInfos) { - super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, indexesDataflowHelperFactories, indexesInfos); + List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories) { + super(spec, indexesDataflowHelperFactories); } @Override - protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx, - IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws HyracksDataException { - FileReference resourecePath = - IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager()); - LOGGER.warn("performing the operation on " + resourecePath.getFile().getAbsolutePath()); - // Get DataflowHelper - IIndexDataflowHelper indexHelper = - indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition); + protected void performOpOnIndex(IIndexDataflowHelper indexHelper, IHyracksTaskContext ctx) + throws HyracksDataException { + String path = indexHelper.getResource().getPath(); + IIOManager ioManager = ctx.getIoManager(); + FileReference file = ioManager.resolve(path); + LOGGER.warn("performing the operation on " + file.getFile().getAbsolutePath()); // Get index IIndex index = indexHelper.getIndexInstance(); // commit transaction ((ITwoPCIndex) index).commitTransaction(); - LOGGER.warn("operation on " + resourecePath.getFile().getAbsolutePath() + " Succeded"); + LOGGER.warn("operation on " + file.getFile().getAbsolutePath() + " Succeded"); } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java index f74f4e9..5413e4d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java @@ -21,10 +21,12 @@ package org.apache.asterix.external.operators; import java.util.List; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager; public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor { @@ -32,19 +34,17 @@ public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExt private static final long serialVersionUID = 1L; public ExternalDatasetIndexesRecoverOperatorDescriptor(IOperatorDescriptorRegistry spec, - IIndexDataflowHelperFactory filesIndexDataflowHelperFactory, IndexInfoOperatorDescriptor fileIndexesInfo, - List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories, - List<IndexInfoOperatorDescriptor> indexesInfos) { - super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, indexesDataflowHelperFactories, - indexesInfos); + List<IIndexDataflowHelperFactory> indexesDataflowHelperFactories) { + super(spec, indexesDataflowHelperFactories); } @Override - protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx, - IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception { - FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager()); - AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIOManager(), file); + protected void performOpOnIndex(IIndexDataflowHelper indexDataflowHelper, IHyracksTaskContext ctx) + throws HyracksDataException { + String path = indexDataflowHelper.getResource().getPath(); + IIOManager ioManager = ctx.getIoManager(); + FileReference file = ioManager.resolve(path); + AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIoManager(), file); fileManager.recoverTransaction(); } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java new file mode 100644 index 0000000..09a3c47 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.operators; + +import java.util.List; + +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.FileIndexTupleTranslator; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.impls.BTree; +import org.apache.hyracks.storage.am.common.api.IIndexBuilder; +import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IIndexBulkLoader; + +/** + * For the replicated file index + * It creates and bulkloads initial set of files + */ +public class ExternalFilesIndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + private List<ExternalFile> files; + private IIndexDataflowHelperFactory dataflowHelperFactory; + private IIndexBuilderFactory indexBuilderFactory; + + public ExternalFilesIndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, + IIndexBuilderFactory indexBuilderFactory, IIndexDataflowHelperFactory dataflowHelperFactory, + List<ExternalFile> files) { + super(spec, 0, 0); + this.indexBuilderFactory = indexBuilderFactory; + this.dataflowHelperFactory = dataflowHelperFactory; + this.files = files; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new AbstractOperatorNodePushable() { + @Override + public void initialize() throws HyracksDataException { + IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); + IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition); + FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator(); + // Build the index + indexBuilder.build(); + // Open the index + indexHelper.open(); + try { + IIndex index = indexHelper.getIndexInstance(); + // Create bulk loader + IIndexBulkLoader bulkLoader = + index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false); + // Load files + for (ExternalFile file : files) { + bulkLoader.add(filesTupleTranslator.getTupleFromFile(file)); + } + bulkLoader.end(); + } finally { + indexHelper.close(); + } + } + + @Override + public void deinitialize() throws HyracksDataException { + } + + @Override + public int getInputArity() { + return 0; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) + throws HyracksDataException { + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + return null; + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java new file mode 100644 index 0000000..684cb15 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.operators; + +import java.util.List; + +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.FileIndexTupleTranslator; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.impls.BTree; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree; +import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree.LSMTwoPCBTreeBulkLoader; +import org.apache.hyracks.storage.common.IIndex; + +/** + * This operator is intended solely for external dataset files replicated index. + * It bulkmodify the index creating a hidden transaction component which later might be committed or deleted + */ +public class ExternalFilesIndexModificationOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + private List<ExternalFile> files; + private IIndexDataflowHelperFactory dataflowHelperFactory; + + public ExternalFilesIndexModificationOperatorDescriptor(IOperatorDescriptorRegistry spec, + IIndexDataflowHelperFactory dataflowHelperFactory, List<ExternalFile> files) { + super(spec, 0, 0); + this.dataflowHelperFactory = dataflowHelperFactory; + this.files = files; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new AbstractOperatorNodePushable() { + + @SuppressWarnings("incomplete-switch") + @Override + public void initialize() throws HyracksDataException { + final IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition); + FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator(); + // Open and get + indexHelper.open(); + IIndex index = indexHelper.getIndexInstance(); + LSMTwoPCBTreeBulkLoader bulkLoader = null; + try { + bulkLoader = (LSMTwoPCBTreeBulkLoader) ((ExternalBTree) index) + .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false); + // Load files + // The files must be ordered according to their numbers + for (ExternalFile file : files) { + switch (file.getPendingOp()) { + case ADD_OP: + case APPEND_OP: + bulkLoader.add(filesTupleTranslator.getTupleFromFile(file)); + break; + case DROP_OP: + bulkLoader.delete(filesTupleTranslator.getTupleFromFile(file)); + break; + case NO_OP: + break; + default: + throw new HyracksDataException("Unknown pending op " + file.getPendingOp()); + } + } + bulkLoader.end(); + } catch (Exception e) { + if (bulkLoader != null) { + bulkLoader.abort(); + } + throw HyracksDataException.create(e); + } finally { + indexHelper.close(); + } + } + + @Override + public void deinitialize() throws HyracksDataException { + } + + @Override + public int getInputArity() { + return 0; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) + throws HyracksDataException { + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + return null; + } + + }; + } +}