http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 555f571..6ad85b3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -85,6 +85,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory; +import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.buffercache.BufferCache; import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy; import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy; @@ -95,7 +96,6 @@ import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy; import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; import org.apache.hyracks.storage.common.file.IResourceIdFactory; @@ -336,7 +336,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { } @Override - public IIOManager getIOManager() { + public IIOManager getIoManager() { return ioManager; }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 07f341d..691be50 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -44,6 +44,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.replication.IReplicaResourcesManager; @@ -56,7 +57,6 @@ import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.transactions.Resource; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; @@ -67,12 +67,12 @@ import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -import org.apache.hyracks.storage.common.file.LocalResource; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.LocalResource; /** * This is the Recovery Manager and is responsible for rolling back a @@ -307,7 +307,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { long lsn = -1; ILSMIndex index = null; LocalResource localResource = null; - Resource localResourceMetadata = null; + DatasetLocalResource localResourceMetadata = null; boolean foundWinner = false; JobEntityCommits jobEntityWinners = null; @@ -359,6 +359,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { * log record. *******************************************************************/ if (localResource == null) { + LOGGER.log(Level.WARNING, "resource was not found for resource id " + resourceId); logRecord = logReader.next(); continue; } @@ -368,11 +369,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //if index is not registered into IndexLifeCycleManager, //create the index using LocalMetadata stored in LocalResourceRepository //get partition path in this node - localResourceMetadata = (Resource) localResource.getResource(); + localResourceMetadata = (DatasetLocalResource) localResource.getResource(); index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath()); if (index == null) { //#. create index instance and register to indexLifeCycleManager - index = localResourceMetadata.createIndexInstance(serviceCtx, localResource); + index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx); datasetLifecycleManager.register(localResource.getPath(), index); datasetLifecycleManager.open(localResource.getPath()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index 0e6cf9a..9f86a26 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -81,7 +81,7 @@ public class TransactionSubsystem implements ITransactionSubsystem { if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { throw new IllegalStateException( String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", - latestCheckpoint.getStorageVersion(), StorageConstants.VERSION)); + StorageConstants.VERSION, latestCheckpoint.getStorageVersion())); } if (replicationEnabled) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index e0648ce..5047bc2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -524,7 +524,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy); Dataset dataset = null; - Index primaryIndex = null; try { IDatasetDetails datasetDetails = null; Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); @@ -612,7 +611,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(), MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); - primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName); if (dd.getDatasetType() == DatasetType.INTERNAL) { Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); @@ -653,7 +651,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); try { - JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, primaryIndex, metadataProvider); + JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; JobUtils.runJob(hcc, jobSpec, true); @@ -872,7 +870,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(ds); + List<List<String>> partitioningKeys = ds.getPrimaryKeys(); for (List<String> partitioningKey : partitioningKeys) { IAType keyType = aRecordType.getSubFieldType(partitioningKey); ITypeTraits typeTrait = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); @@ -933,8 +931,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } // This is the first index for the external dataset, replicate the files index - spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot, - metadataProvider, true); + spec = ExternalIndexingOperations.buildFilesIndexCreateJobSpec(ds, externalFilesSnapshot, + metadataProvider); if (spec == null) { throw new CompilationException( "Failed to create job spec for replicating Files Index For external dataset"); @@ -1213,9 +1211,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j))); } } - Index primaryIndex = - MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName); - jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), primaryIndex, metadataProvider)); + jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), metadataProvider)); } else { // External dataset List<Index> indexes = @@ -2268,14 +2264,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataProvider metadataProvider, ARecordType enforcedType, ARecordType enforcedMeta) throws AlgebricksException { for (int j = 0; j < indexes.size(); j++) { - if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) { - jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType, - metaRecordType, enforcedType, enforcedMeta, metadataProvider)); - } + jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType, + metaRecordType, enforcedType, enforcedMeta, metadataProvider)); } - jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider, - new StorageComponentProvider())); } private interface IMetadataLocker { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java index d0d14c8..8326fe2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java @@ -26,7 +26,6 @@ import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; import org.apache.hyracks.storage.am.common.freepage.AppendOnlyLinkedMetadataPageManagerFactory; @@ -66,11 +65,6 @@ public class StorageComponentProvider implements IStorageComponentProvider { } @Override - public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() { - return RuntimeComponentsProvider.RUNTIME_PROVIDER; - } - - @Override public IStorageManager getStorageManager() { return RuntimeComponentsProvider.RUNTIME_PROVIDER; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index ec01e1c..26952ad 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -205,7 +205,7 @@ public class NCApplication extends BaseNCApplication { private void performLocalCleanUp() { //Delete working area files from failed jobs - runtimeContext.getIOManager().deleteWorkspaceFiles(); + runtimeContext.getIoManager().deleteWorkspaceFiles(); //Reclaim storage for temporary datasets. String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index d1ff871..6155450 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -214,8 +214,8 @@ public class FeedOperations { JobSpecification subJob = jobsList.get(iter1); operatorIdMapping.clear(); Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap(); - FeedConnectionId feedConnectionId = - new FeedConnectionId(ingestionOp.getEntityId(), feedConnections.get(iter1).getDatasetName()); + String datasetName = feedConnections.get(iter1).getDatasetName(); + FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), datasetName); FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(), @@ -227,9 +227,8 @@ public class FeedOperations { OperatorDescriptorId opId = null; if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) { - String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName(); metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc, - feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, operandId); + feedPolicyEntity.getProperties(), FeedRuntimeType.STORE); opId = metaOp.getOperatorId(); opDesc.setOperatorId(opId); } else { @@ -243,7 +242,7 @@ public class FeedOperations { // anything on the network interface needs to be message compatible if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) { metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc, - feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, null); + feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE); opId = metaOp.getOperatorId(); opDesc.setOperatorId(opId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index cdb9b5b..856638f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -21,6 +21,7 @@ package org.apache.asterix.app.bootstrap; import java.io.File; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.logging.Logger; @@ -33,10 +34,9 @@ import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; -import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.common.transactions.IResourceFactory; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; @@ -48,16 +48,14 @@ import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.MetadataUtil; +import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; import org.apache.asterix.transaction.management.runtime.CommitRuntime; import org.apache.asterix.transaction.management.service.logging.LogReader; import org.apache.commons.lang3.tuple.Pair; @@ -68,6 +66,8 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -75,7 +75,6 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.util.HyracksConstants; @@ -83,17 +82,17 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil; import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable; +import org.apache.hyracks.storage.am.common.api.IIndexBuilder; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; +import org.apache.hyracks.storage.common.IResourceFactory; import org.apache.hyracks.test.support.TestUtils; import org.apache.hyracks.util.file.FileUtil; import org.mockito.Mockito; @@ -175,14 +174,13 @@ public class TestNodeController { new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op), ResourceType.LSM_BTREE); - LSMTreeInsertDeleteOperatorDescriptor indexOpDesc = - getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory); - IIndexDataflowHelperFactory dataflowHelperFactory = - getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset); - Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory); IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); - LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx, - PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true); + IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider); + LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION, + primaryIndexInfo.primaryIndexInsertFieldsPermutations, + recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op, + true, indexHelperFactory, modOpCallbackFactory, null); CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true); insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); @@ -200,18 +198,13 @@ public class TestNodeController { PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, storageComponentProvider); - IIndexDataflowHelperFactory indexDataflowHelperFactory = - getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider); BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits, - primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields, - primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true, - indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields, - filterFields, storageComponentProvider.getMetadataPageManagerFactory()); - BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0, - primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null, - /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, false, filterFields, filterFields); + null, null, true, true, indexDataflowHelperFactory, false, false, null, + NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false); + BTreeSearchOperatorNodePushable searchOp = + searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1); emptyTupleOp.setFrameWriter(0, searchOp, primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); @@ -227,76 +220,8 @@ public class TestNodeController { return jobId; } - public LSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo, - IModificationOperationCallbackFactory modOpCallbackFactory) { - LSMTreeInsertDeleteOperatorDescriptor indexOpDesc = Mockito.mock(LSMTreeInsertDeleteOperatorDescriptor.class); - Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER); - Mockito.when(indexOpDesc.getStorageManager()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER); - Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider); - Mockito.when(indexOpDesc.getLocalResourceFactoryProvider()) - .thenReturn(primaryIndexInfo.localResourceFactoryProvider); - Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits); - Mockito.when(indexOpDesc.getTreeIndexComparatorFactories()) - .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories); - Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields()) - .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields); - Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory); - Mockito.when(indexOpDesc.getPageManagerFactory()) - .thenReturn(primaryIndexInfo.storageComponentProvider.getMetadataPageManagerFactory()); - return indexOpDesc; - } - - public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) { - TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class); - Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER); - Mockito.when(indexOpDesc.getStorageManager()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER); - Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider); - Mockito.when(indexOpDesc.getLocalResourceFactoryProvider()) - .thenReturn(primaryIndexInfo.localResourceFactoryProvider); - Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits); - Mockito.when(indexOpDesc.getTreeIndexComparatorFactories()) - .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories); - Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields()) - .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields); - Mockito.when(indexOpDesc.getPageManagerFactory()) - .thenReturn(primaryIndexInfo.storageComponentProvider.getMetadataPageManagerFactory()); - return indexOpDesc; - } - - public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) { - FileSplit fileSplit = new ManagedFileSplit(ExecutionTestUtil.integrationUtil.ncs[0].getId(), - dataset.getDataverseName() + File.separator + dataset.getDatasetName()); - return new ConstantFileSplitProvider(new FileSplit[] { fileSplit }); - } - - public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider( - IStorageComponentProvider storageComponentProvider, Index index, Dataset dataset, - ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories, - int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, - IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, - ILSMOperationTrackerFactory opTrackerProvider) throws AlgebricksException { - IResourceFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(primaryIndexTypeTraits, - primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), - mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, - filterFields, opTrackerProvider, dataset.getIoOperationCallbackFactory(index), - storageComponentProvider.getMetadataPageManagerFactory()); - ILocalResourceFactoryProvider localResourceFactoryProvider = - new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource); - return localResourceFactoryProvider; - } - - public IIndexDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx, - PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc, - IStorageComponentProvider storageComponentProvider, Dataset dataset) - throws AlgebricksException, HyracksDataException { - return getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset) - .createIndexDataflowHelper(indexOpDesc, ctx, PARTITION); - } - - public IIndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx, - PrimaryIndexInfo primaryIndexInfo, IStorageComponentProvider storageComponentProvider, Dataset dataset) - throws AlgebricksException { + public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx, PrimaryIndexInfo primaryIndexInfo, + IStorageComponentProvider storageComponentProvider, Dataset dataset) throws AlgebricksException { Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); Index index = primaryIndexInfo.getIndex(); @@ -304,27 +229,13 @@ public class TestNodeController { (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse, storageComponentProvider); try { - return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType, - primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, - primaryIndexInfo.mergePolicyProperties); + return dataset.getResourceFactory(mdProvider, index, primaryIndexInfo.recordType, primaryIndexInfo.metaType, + primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties); } finally { mdProvider.getLocks().unlock(); } } - public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes, - ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, int[] filterFields, - IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes, - List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException { - PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, - mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, - storageComponentProvider); - TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); - return getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc, - storageComponentProvider, dataset); - } - public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes, @@ -332,10 +243,23 @@ public class TestNodeController { PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, storageComponentProvider); - TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); - IIndexDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, - indexOpDesc, storageComponentProvider, dataset); - dataflowHelper.create(); + Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(), + MetadataUtil.PENDING_NO_OP); + MetadataProvider mdProvider = new MetadataProvider( + (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse, + storageComponentProvider); + try { + IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index, + recordType, metaType, mergePolicyFactory, mergePolicyProperties); + IndexBuilderFactory indexBuilderFactory = + new IndexBuilderFactory(storageComponentProvider.getStorageManager(), + primaryIndexInfo.fileSplitProvider, resourceFactory, !dataset.isTemp()); + IHyracksTaskContext ctx = createTestContext(false); + IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0); + indexBuilder.build(); + } finally { + mdProvider.getLocks().unlock(); + } } private int[] createPrimaryIndexBloomFilterFields(int length) { @@ -392,7 +316,7 @@ public class TestNodeController { } ctx = Mockito.spy(ctx); Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); - Mockito.when(ctx.getIOManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); + Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); return ctx; } @@ -430,7 +354,6 @@ public class TestNodeController { private ITypeTraits[] filterTypeTraits; private IBinaryComparatorFactory[] filterCmpFactories; private int[] btreeFields; - private ILocalResourceFactoryProvider localResourceFactoryProvider; private ConstantFileSplitProvider fileSplitProvider; private RecordDescriptor rDesc; private int[] primaryIndexInsertFieldsPermutations; @@ -463,7 +386,6 @@ public class TestNodeController { filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, recordType, NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider()); btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset); - fileSplitProvider = getFileSplitProvider(dataset); primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType); rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); @@ -483,10 +405,9 @@ public class TestNodeController { index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(), IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, true, MetadataUtil.PENDING_NO_OP); - localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index, - dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, - mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, - filterFields, dataset.getIndexOperationTrackerFactory(index)); + List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); + FileSplit[] splits = SplitsAndConstraintsUtil.getDatasetSplits(dataset, nodes, index.getIndexName(), false); + fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1)); } public Index getIndex() { @@ -523,4 +444,21 @@ public class TestNodeController { createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType); return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); } + + public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo, + IStorageComponentProvider storageComponentProvider) throws AlgebricksException { + return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider); + } + + public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, int[] filterFields, + IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes, + List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException { + PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, + mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, + storageComponentProvider); + return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider) + .create(createTestContext(true), PARTITION); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index 1bcc88a..ef72c67 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.test.dataflow; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -105,9 +106,11 @@ public class LogMarkerTest { TestNodeController nc = new TestNodeController(null, false); nc.init(); StorageComponentProvider storageManager = new StorageComponentProvider(); + List<List<String>> partitioningKeys = new ArrayList<>(); + partitioningKeys.add(Collections.singletonList("key")); Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - Collections.emptyList(), null, null, null, false, null, false), + partitioningKeys, null, null, null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); try { nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 35c3c42..d2bf3d3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -19,6 +19,7 @@ package org.apache.asterix.test.logging; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -112,9 +113,11 @@ public class CheckpointingTest { TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false); StorageComponentProvider storageManager = new StorageComponentProvider(); nc.init(); + List<List<String>> partitioningKeys = new ArrayList<>(); + partitioningKeys.add(Collections.singletonList("key")); Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - Collections.emptyList(), null, null, null, false, null, false), + partitioningKeys, null, null, null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); try { nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index ae19f23..a32d4dc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -25,9 +25,9 @@ import org.apache.asterix.common.context.IndexInfo; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IResourceLifecycleManager; public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> { /** @@ -73,19 +73,19 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd /** * creates (if necessary) and returns the primary index operation tracker of a dataset. * - * @param datasetID + * @param datasetId * @return */ - PrimaryIndexOperationTracker getOperationTracker(int datasetID); + PrimaryIndexOperationTracker getOperationTracker(int datasetId); /** * creates (if necessary) and returns the dataset virtual buffer caches. * - * @param datasetID + * @param datasetId * @param ioDeviceNum * @return */ - List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum); + List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum); /** * Flushes then closes all open datasets http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 5e69746..a4b994b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -36,14 +36,14 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.IResourceIdFactory; public interface INcApplicationContext extends IApplicationContext { - IIOManager getIOManager(); + IIOManager getIoManager(); Executor getThreadExecutor(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java index f122096..84e7ed5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java @@ -21,32 +21,33 @@ package org.apache.asterix.common.context; import java.util.List; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IODeviceHandle; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; public class AsterixVirtualBufferCacheProvider implements IVirtualBufferCacheProvider { private static final long serialVersionUID = 1L; - private final int datasetID; + private final int datasetId; - public AsterixVirtualBufferCacheProvider(int datasetID) { - this.datasetID = datasetID; + public AsterixVirtualBufferCacheProvider(int datasetId) { + this.datasetId = datasetId; } @Override - public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx, - IFileSplitProvider fileSplitProvider) throws HyracksDataException { - final int partition = ctx.getTaskAttemptId().getTaskId().getPartition(); - IIOManager ioManager = ctx.getIOManager(); - FileSplit fileSplit = fileSplitProvider.getFileSplits()[partition]; - FileReference fileRef = fileSplit.getFileReference(ioManager); + public List<IVirtualBufferCache> getVirtualBufferCaches(INCServiceContext ctx, FileReference fileRef) + throws HyracksDataException { + IIOManager ioManager = ctx.getIoManager(); + int deviceId = getDeviceId(ioManager, fileRef); + return ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager() + .getVirtualBufferCaches(datasetId, deviceId); + } + + public static int getDeviceId(IIOManager ioManager, FileReference fileRef) { IODeviceHandle device = fileRef.getDeviceHandle(); List<IODeviceHandle> devices = ioManager.getIODevices(); int deviceId = 0; @@ -56,8 +57,7 @@ public class AsterixVirtualBufferCacheProvider implements IVirtualBufferCachePro deviceId = i; } } - return ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext()) - .getDatasetLifecycleManager().getVirtualBufferCaches(datasetID, deviceId); + return deviceId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java index 5c1d094..9cb1de5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java @@ -19,11 +19,11 @@ package org.apache.asterix.common.context; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; public class BaseOperationTracker implements ILSMOperationTracker { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index 17da26a..cd1d95e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; @@ -42,11 +41,11 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { private int maxToleranceComponentCount; private final IDatasetLifecycleManager datasetLifecycleManager; - private final int datasetID; + private final int datasetId; - public CorrelatedPrefixMergePolicy(IResourceLifecycleManager datasetLifecycleManager, int datasetID) { - this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager; - this.datasetID = datasetID; + public CorrelatedPrefixMergePolicy(IDatasetLifecycleManager datasetLifecycleManager, int datasetId) { + this.datasetLifecycleManager = datasetLifecycleManager; + this.datasetId = datasetId; } @Override @@ -80,7 +79,7 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { int startIndex = -1; int minNumComponents = Integer.MAX_VALUE; - Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetID).getDatasetIndexes(); + Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes(); for (ILSMIndex lsmIndex : indexes) { minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size()); } @@ -122,8 +121,10 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { @Override public void configure(Map<String, String> properties) { - maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size")); - maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count")); + maxMergableComponentSize = + Long.parseLong(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE)); + maxToleranceComponentCount = + Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT)); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java index cec9f57..3c141bc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java @@ -24,35 +24,26 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactory { private static final long serialVersionUID = 1L; + public static final String NAME = "correlated-prefix"; + public static final String KEY_DATASET_ID = "datasetId"; + public static final String KEY_MAX_COMPONENT_SIZE = "max-mergable-component-size"; + public static final String KEY_MAX_COMPONENT_COUNT = "max-tolerance-component-count"; - private static final String[] SET_VALUES = new String[] { "max-mergable-component-size", - "max-tolerance-component-count" }; + private static final String[] SET_VALUES = new String[] { KEY_MAX_COMPONENT_SIZE, KEY_MAX_COMPONENT_COUNT }; private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES)); - private int datasetID; - - @Override - public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) { - IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getJobletContext() - .getServiceContext().getApplicationContext()).getDatasetLifecycleManager(); - ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID); - policy.configure(properties); - return policy; - } - @Override public String getName() { - return "correlated-prefix"; + return NAME; } @Override @@ -61,13 +52,12 @@ public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactor } @Override - public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) { - ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(ilcm, datasetID); - policy.configure(properties); + public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, INCServiceContext ctx) { + IDatasetLifecycleManager dslcManager = + ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); + int datasetId = Integer.parseInt(configuration.get(KEY_DATASET_ID)); + ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetId); + policy.configure(configuration); return policy; } - - public void setDatasetID(int datasetID) { - this.datasetID = datasetID; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 1a8ccae..9529366 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -28,24 +28,24 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; -import org.apache.asterix.common.transactions.Resource; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.LocalResource; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.ILocalResourceRepository; +import org.apache.hyracks.storage.common.LocalResource; public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent { private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); @@ -59,9 +59,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC private final int numPartitions; private volatile boolean stopped = false; - public DatasetLifecycleManager(StorageProperties storageProperties, - ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager, - int numPartitions) { + public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository, + int firstAvilableUserDatasetID, ILogManager logManager, int numPartitions) { this.logManager = logManager; this.storageProperties = storageProperties; this.resourceRepository = resourceRepository; @@ -107,7 +106,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC if (lr == null) { return -1; } - return ((Resource) lr.getResource()).datasetId(); + return ((DatasetLocalResource) lr.getResource()).getDatasetId(); } public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException { @@ -209,8 +208,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC for (DatasetResource dsr : datasetsResources) { PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); if (opTracker != null && opTracker.getNumActiveOperations() == 0 - && dsr.getDatasetInfo().getReferenceCount() == 0 - && dsr.getDatasetInfo().isOpen() + && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) { closeDataset(dsr.getDatasetInfo()); return true; @@ -221,8 +219,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException { if (iInfo.isOpen()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } @@ -250,8 +248,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC DatasetInfo dsInfo = new DatasetInfo(did); PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo); DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties, - getFirstAvilableUserDatasetID(), - getNumPartitions()); + getFirstAvilableUserDatasetID(), getNumPartitions()); dsr = new DatasetResource(dsInfo, opTracker, vbcs); datasets.put(did, dsr); } @@ -320,8 +317,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public PrimaryIndexOperationTracker getOperationTracker(int datasetID) { - return datasets.get(datasetID).getOpTracker(); + public PrimaryIndexOperationTracker getOperationTracker(int datasetId) { + return datasets.get(datasetId).getOpTracker(); } private void validateDatasetLifecycleManagerState() throws HyracksDataException { @@ -357,8 +354,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); synchronized (opTracker) { for (IndexInfo iInfo : dsr.getIndexes().values()) { - AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex() - .getIOOperationCallback(); + AbstractLSMIOOperationCallback ioCallback = + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); if (!(((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { @@ -400,16 +397,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } for (IndexInfo iInfo : dsInfo.getIndexes().values()) { //update resource lsn - AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex() - .getIOOperationCallback(); + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); } } if (asyncFlush) { for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + ILSMIndexAccessor accessor = + iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } } else { @@ -501,9 +498,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access")); for (DatasetResource dsr : datasets.values()) { DatasetInfo dsInfo = dsr.getDatasetInfo(); - sb.append( - String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(), - dsInfo.getLastAccess())); + sb.append(String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(), + dsInfo.getLastAccess())); } sb.append("\n"); @@ -514,8 +510,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC for (Map.Entry<Long, IndexInfo> entry : dsInfo.getIndexes().entrySet()) { IndexInfo iInfo = entry.getValue(); sb.append(String.format(idxFormat, dsInfo.getDatasetID(), entry.getKey(), iInfo.isOpen(), - iInfo.getReferenceCount(), - iInfo.getIndex())); + iInfo.getReferenceCount(), iInfo.getIndex())); } } outputStream.write(sb.toString().getBytes()); @@ -557,7 +552,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC while (used + additionalSize > capacity) { if (!evictCandidateDataset()) { throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID() - + " memory since memory budget would be exceeded."); + + " memory since memory budget would be exceeded."); } } used += additionalSize; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index 41e587d..a880ce2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -21,8 +21,8 @@ package org.apache.asterix.common.context; import java.util.Map; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.common.IIndex; /** * A dataset can be in one of two states { EVICTED , LOADED }. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java index d454349..f528f12 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java @@ -20,7 +20,6 @@ package org.apache.asterix.common.context; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; @@ -42,12 +41,6 @@ public interface IStorageComponentProvider { ILSMIOOperationSchedulerProvider getIoOperationSchedulerProvider(); /** - * @return the application's root - * {@link org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider} instance - */ - IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider(); - - /** * @return {@link org.apache.hyracks.storage.common.IStorageManager} instance */ IStorageManager getStorageManager(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index feeb578..903bb50 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -29,8 +29,6 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -38,6 +36,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; public class PrimaryIndexOperationTracker extends BaseOperationTracker { @@ -143,20 +143,16 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { - //get resource ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); - //schedule flush after update accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } - flushLogCreated = false; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java new file mode 100644 index 0000000..78b5cb2 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.dataflow; + +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IResource; + +/** + * A local resource with a dataset id and an assigned partition + */ +public class DatasetLocalResource implements IResource { + + private static final long serialVersionUID = 1L; + /** + * The dataset id + */ + private final int datasetId; + /** + * The resource partition + */ + private final int partition; + private final IResource resource; + + public DatasetLocalResource(int datasetId, int partition, IResource resource) { + this.datasetId = datasetId; + this.partition = partition; + this.resource = resource; + } + + public int getPartition() { + return partition; + } + + public int getDatasetId() { + return datasetId; + } + + @Override + public String getPath() { + return resource.getPath(); + } + + @Override + public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException { + return resource.createInstance(ncServiceCtx); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index e5b78cf..b7adfde 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -25,72 +25,63 @@ import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.IJobLifecycleListener; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.common.IStorageManager; /** * Provides methods for obtaining - * {@link org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider}, * {@link org.apache.hyracks.storage.common.IStorageManager}, * {@link org.apache.hyracks.api.application.ICCServiceContext}, * {@link org.apache.asterix.common.cluster.IGlobalRecoveryManager}, - * and {@link org.apache.asterix.common.library.ILibraryManager} + * {@link org.apache.asterix.common.library.ILibraryManager}, + * {@link org.apache.asterix.common.transactions.IResourceIdManager} + * * at the cluster controller side. */ public interface ICcApplicationContext extends IApplicationContext { /** - * Returns an instance of the implementation for IIndexLifecycleManagerProvider. - * - * @return IIndexLifecycleManagerProvider implementation instance - */ - public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider(); - - /** * @return an instance which implements {@link org.apache.hyracks.storage.common.IStorageManager} */ - public IStorageManager getStorageManager(); + IStorageManager getStorageManager(); /** * @return an instance which implements {@link org.apache.hyracks.api.application.ICCServiceContext} */ @Override - public ICCServiceContext getServiceContext(); + ICCServiceContext getServiceContext(); /** * @return the global recovery manager which implements * {@link org.apache.asterix.common.cluster.IGlobalRecoveryManager} */ - public IGlobalRecoveryManager getGlobalRecoveryManager(); + IGlobalRecoveryManager getGlobalRecoveryManager(); /** * @return the active lifecycle listener at Cluster controller */ - public IJobLifecycleListener getActiveLifecycleListener(); + IJobLifecycleListener getActiveLifecycleListener(); /** * @return a new instance of {@link IHyracksClientConnection} */ - public IHyracksClientConnection getHcc(); + IHyracksClientConnection getHcc(); /** - * Returns the resource manager - * - * @return {@link IResourceIdManager} implementation instance + * @return the cluster wide resource id manager */ - public IResourceIdManager getResourceIdManager(); + IResourceIdManager getResourceIdManager(); /** * Returns the storage component provider * * @return {@link IStorageComponentProvider} implementation instance */ - public IStorageComponentProvider getStorageComponentProvider(); + IStorageComponentProvider getStorageComponentProvider(); /** * Returns the extension manager * * @return the extension manager instance */ - public Object getExtensionManager(); + Object getExtensionManager(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java index 4b70dd7..3f9fba9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java @@ -25,7 +25,6 @@ import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -34,8 +33,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.utils.TaskUtil; +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; @@ -64,16 +64,17 @@ public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDel private int currentTupleIdx; private int lastFlushedTupleIdx; - public LSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, - int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op, - boolean isPrimary) throws HyracksDataException { - super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op); + public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition, int[] fieldPermutation, + RecordDescriptor inputRecDesc, IndexOperation op, boolean isPrimary, + IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory, + ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { + super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory, + tupleFilterFactory); this.isPrimary = isPrimary; } @Override public void open() throws HyracksDataException { - RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); appender = new FrameTupleAppender(writeBuffer); @@ -85,10 +86,9 @@ public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDel TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx); } writer.open(); - modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback( - indexHelper.getResource(), ctx, this); + modCallback = + modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this); indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); - ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory(); if (tupleFilterFactory != null) { tupleFilter = tupleFilterFactory.createTupleFilter(ctx); frameTuple = new FrameTupleReference();