abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/962
Change subject: Add Test NodeController ...................................................................... Add Test NodeController This test enable creating a node controller for unit test purposes. The Node controller is identical to the regular node controller except that it doesn't communicate with a cluster controller at all. Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243 --- A asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java 2 files changed, 542 insertions(+), 7 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/62/962/1 diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java new file mode 100644 index 0000000..7560675 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.bootstrap; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; + +import org.apache.asterix.algebra.operators.physical.CommitRuntime; +import org.apache.asterix.api.common.AsterixAppRuntimeContext; +import org.apache.asterix.common.api.ILocalResourceMetadata; +import org.apache.asterix.common.config.AsterixStorageProperties; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; +import org.apache.asterix.common.messaging.ResourceIdRequestMessage; +import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage; +import org.apache.asterix.common.messaging.api.IApplicationMessageCallback; +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider; +import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtils; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.formats.NonTaggedDataFormat; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; +import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.asterix.transaction.management.service.logging.LogReader; +import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; +import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; +import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable; +import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; +import org.apache.hyracks.storage.common.file.LocalResource; +import org.apache.hyracks.test.support.TestUtils; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestNodeController { + // Constants + public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; + public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099; + public static final int KB32 = 32768; + public static final int PARTITION = 0; + public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = new TransactionSubsystemProvider(); + // Final + private final String nodeName; + private final String ioDeviceDirList; + // Mutables + private JobId jobId; + private long jobCounter = 0L; + private long resourceCounter = 0; + private NodeControllerService nodeControllerService; + private IIndexLifecycleManagerProvider ilcmp; + private IHyracksJobletContext jobletCtx; + private AsterixStorageProperties storageProperties; + + public TestNodeController(String nodeName, String ioDeviceDirList) + throws AsterixException, HyracksException, ACIDException { + this.nodeName = nodeName; + this.ioDeviceDirList = ioDeviceDirList; + } + + public void init() throws Exception { + String ncName = nodeName; + NCConfig ncConfig = new NCConfig(); + ncConfig.nodeId = ncName; + ncConfig.resultSweepThreshold = 1000; + ncConfig.appArgs = Arrays.asList("-virtual-NC"); + ncConfig.ioDevices = ioDeviceDirList; + ncConfig.ccHost = "localhost"; + ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT; + ncConfig.clusterNetIPAddress = "127.0.0.1"; + ncConfig.dataIPAddress = "127.0.0.1"; + ncConfig.resultIPAddress = "127.0.0.1"; + ncConfig.resultTTL = 30000; + String[] nodeStores = ioDeviceDirList.split(","); + for (int p = 0; p < nodeStores.length; p++) { + // create IO devices based on stores + File ioDeviceDir = new File(nodeStores[p]); + ioDeviceDir.mkdirs(); + } + ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName(); + nodeControllerService = new NodeControllerService(ncConfig); + nodeControllerService.testStart(); + NCMessageBroker messageBroker = (NCMessageBroker) Mockito + .spy(nodeControllerService.getApplicationContext().getMessageBroker()); + Mockito.doNothing().when(messageBroker).reportMaxResourceId(); + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + IApplicationMessageCallback callback = invocation.getArgumentAt(1, IApplicationMessageCallback.class); + ResourceIdRequestResponseMessage message = new ResourceIdRequestResponseMessage(); + message.setId(resourceCounter++); + callback.deliverMessageResponse(message); + return null; + } + }).when(messageBroker).sendMessage(Mockito.any(ResourceIdRequestMessage.class), Mockito.any()); + nodeControllerService.getApplicationContext().setMessageBroker(messageBroker); + nodeControllerService.testStartComplete(); + ilcmp = Mockito.mock(IIndexLifecycleManagerProvider.class); + Mockito.when(ilcmp.getLifecycleManager(Mockito.any())) + .thenReturn(getAppRuntimeContext().getDatasetLifecycleManager()); + getAppRuntimeContext().getLSMIOScheduler(); + jobletCtx = Mockito.mock(IHyracksJobletContext.class); + Mockito.when(jobletCtx.getApplicationContext()).thenReturn(nodeControllerService.getApplicationContext()); + Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() { + @Override + public JobId answer(InvocationOnMock invocation) throws Throwable { + return jobId; + } + }); + storageProperties = getAppRuntimeContext().getStorageProperties(); + } + + public void deInit() throws Exception { + nodeControllerService.testStop(); + } + + public org.apache.asterix.common.transactions.JobId getTxnJobId() { + return new org.apache.asterix.common.transactions.JobId((int) jobId.getId()); + } + + public AsterixLSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset, + IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, + ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields) + throws AlgebricksException { + PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, + mergePolicyFactory, mergePolicyProperties, filterFields); + IndexOperation op = IndexOperation.INSERT; + IModificationOperationCallbackFactory modOpCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory( + getTxnJobId(), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, + ResourceType.LSM_BTREE, true); + AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = getInsertOpratorDesc(primaryIndexInfo, + modOpCallbackFactory); + LSMBTreeDataflowHelperFactory dataflowHelperFactory = getPrimaryIndexDataflowHelperFactory(ctx, + primaryIndexInfo); + Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory); + IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); + AsterixLSMInsertDeleteOperatorNodePushable insertOp = new AsterixLSMInsertDeleteOperatorNodePushable( + indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, + op, true); + CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(), + primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION); + insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); + commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); + return insertOp; + } + + public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset, + IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, + NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields) + throws HyracksDataException, AlgebricksException { + IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx); + JobSpecification spec = new JobSpecification(); + PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, + mergePolicyFactory, mergePolicyProperties, filterFields); + LSMBTreeDataflowHelperFactory indexDataflowHelperFactory = getPrimaryIndexDataflowHelperFactory(ctx, + primaryIndexInfo); + BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc, + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, + primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits, + primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields, + primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true, + indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields, + filterFields); + BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0, + primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null, + /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, filterFields, filterFields); + emptyTupleOp.setFrameWriter(0, searchOp, + primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); + searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); + return emptyTupleOp; + } + + public LogReader getTransactionLogReader(boolean isRecoveryMode) { + return (LogReader) getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode); + } + + public JobId newJobId() { + jobId = new JobId(jobCounter++); + return jobId; + } + + public AsterixLSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo, + IModificationOperationCallbackFactory modOpCallbackFactory) { + AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = Mockito + .mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class); + Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(ilcmp); + Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER); + Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider); + Mockito.when(indexOpDesc.getLocalResourceFactoryProvider()) + .thenReturn(primaryIndexInfo.localResourceFactoryProvider); + Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits); + Mockito.when(indexOpDesc.getTreeIndexComparatorFactories()) + .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories); + Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields()) + .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields); + Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory); + return indexOpDesc; + } + + public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) { + TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class); + Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(ilcmp); + Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER); + Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider); + Mockito.when(indexOpDesc.getLocalResourceFactoryProvider()) + .thenReturn(primaryIndexInfo.localResourceFactoryProvider); + Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits); + Mockito.when(indexOpDesc.getTreeIndexComparatorFactories()) + .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories); + Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields()) + .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields); + return indexOpDesc; + } + + public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) { + FileSplit fileSplit = new FileSplit(nodeName, + dataset.getDataverseName() + File.separator + dataset.getDatasetName()); + return new ConstantFileSplitProvider(new FileSplit[] { fileSplit }); + } + + public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider(Dataset dataset, + ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories, + int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields) { + ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits, + primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), + mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, + filterFields); + ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( + localResourceMetadata, LocalResource.LSMBTreeResource); + return localResourceFactoryProvider; + } + + public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx, + PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc) + throws AlgebricksException { + LSMBTreeDataflowHelperFactory dataflowHelperFactory = new LSMBTreeDataflowHelperFactory( + new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()), + primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties, + new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), true, primaryIndexInfo.filterTypeTraits, + primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true); + IndexDataflowHelper dataflowHelper = dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, 0); + return (LSMBTreeDataflowHelper) dataflowHelper; + } + + public LSMBTreeDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx, + PrimaryIndexInfo primaryIndexInfo) throws AlgebricksException { + return new LSMBTreeDataflowHelperFactory( + new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()), + primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties, + new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, + storageProperties.getBloomFilterFalsePositiveRate(), true, primaryIndexInfo.filterTypeTraits, + primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true); + } + + public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, int[] filterFields) + throws AlgebricksException, HyracksDataException { + PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, + mergePolicyFactory, mergePolicyProperties, filterFields); + TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); + return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc); + } + + public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, + ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, + int[] filterFields) throws AlgebricksException, HyracksDataException { + PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, + mergePolicyFactory, mergePolicyProperties, filterFields); + TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); + LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, + indexOpDesc); + dataflowHelper.create(); + } + + private int[] createPrimaryIndexBloomFilterFields(int length) { + int[] primaryIndexBloomFilterKeyFields = new int[length]; + for (int j = 0; j < length; ++j) { + primaryIndexBloomFilterKeyFields[j] = j; + } + return primaryIndexBloomFilterKeyFields; + } + + private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) { + IBinaryComparatorFactory[] primaryIndexComparatorFactories = new IBinaryComparatorFactory[primaryKeyTypes.length]; + for (int j = 0; j < primaryKeyTypes.length; ++j) { + primaryIndexComparatorFactories[j] = AqlBinaryComparatorFactoryProvider.INSTANCE + .getBinaryComparatorFactory(primaryKeyTypes[j], true); + } + return primaryIndexComparatorFactories; + } + + private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields, + IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) { + int i = 0; + ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields]; + for (; i < primaryKeyTypes.length; i++) { + primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(primaryKeyTypes[i]); + } + primaryIndexSerdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType); + if (metaType != null) { + primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); + } + return primaryIndexSerdes; + } + + private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes, + ARecordType recordType, ARecordType metaType) { + ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields]; + int i = 0; + for (; i < primaryKeyTypes.length; i++) { + primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]); + } + primaryIndexTypeTraits[i++] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(recordType); + if (metaType != null) { + primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaType); + } + return primaryIndexTypeTraits; + } + + public IHyracksTaskContext createTestContext() { + IHyracksTaskContext ctx = TestUtils.create(KB32); + ctx = Mockito.spy(ctx); + Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); + Mockito.when(ctx.getIOManager()).thenReturn(nodeControllerService.getRootContext().getIOManager()); + return ctx; + } + + public TransactionSubsystem getTransactionSubsystem() { + return (TransactionSubsystem) ((AsterixAppRuntimeContext) nodeControllerService.getApplicationContext() + .getApplicationObject()).getTransactionSubsystem(); + } + + public ITransactionManager getTransactionManager() { + return getTransactionSubsystem().getTransactionManager(); + } + + public AsterixAppRuntimeContext getAppRuntimeContext() { + return (AsterixAppRuntimeContext) nodeControllerService.getApplicationContext().getApplicationObject(); + } + + public DatasetLifecycleManager getDatasetLifecycleManager() { + return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager(); + } + + @SuppressWarnings("unused") + private class PrimaryIndexInfo { + private Dataset dataset; + private IAType[] primaryKeyTypes; + private ARecordType recordType; + private ARecordType metaType; + private ILSMMergePolicyFactory mergePolicyFactory; + private Map<String, String> mergePolicyProperties; + private int[] filterFields; + private int primaryIndexNumOfTupleFields; + private IBinaryComparatorFactory[] primaryIndexComparatorFactories; + private ITypeTraits[] primaryIndexTypeTraits; + private ISerializerDeserializer<?>[] primaryIndexSerdes; + private int[] primaryIndexBloomFilterKeyFields; + private ITypeTraits[] filterTypeTraits; + private IBinaryComparatorFactory[] filterCmpFactories; + private int[] btreeFields; + private ILocalResourceFactoryProvider localResourceFactoryProvider; + private ConstantFileSplitProvider fileSplitProvider; + private RecordDescriptor rDesc; + private int[] primaryIndexInsertFieldsPermutations; + private int[] primaryKeyIndexes; + + public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, + ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, + int[] filterFields) throws AlgebricksException { + this.dataset = dataset; + this.primaryKeyTypes = primaryKeyTypes; + this.recordType = recordType; + this.metaType = metaType; + this.mergePolicyFactory = mergePolicyFactory; + this.mergePolicyProperties = mergePolicyProperties; + this.filterFields = filterFields; + primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1)); + primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, + recordType, metaType); + primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes); + primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length); + filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType); + filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recordType, + NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider()); + btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); + localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(dataset, primaryIndexTypeTraits, + primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, mergePolicyFactory, + mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields); + fileSplitProvider = getFileSplitProvider(dataset); + primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, + metaType); + rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); + primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields]; + for (int i = 0; i < primaryIndexNumOfTupleFields; i++) { + primaryIndexInsertFieldsPermutations[i] = i; + } + primaryKeyIndexes = new int[primaryKeyTypes.length]; + for (int i = 0; i < primaryKeyIndexes.length; i++) { + primaryKeyIndexes[i] = i; + } + } + + public IRecordDescriptorProvider getInsertRecordDescriptorProvider() { + IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); + Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc); + return rDescProvider; + } + + public IRecordDescriptorProvider getSearchRecordDescriptorProvider() { + ITypeTraits[] primaryKeyTypeTraits = new ITypeTraits[primaryKeyTypes.length]; + ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length]; + for (int i = 0; i < primaryKeyTypes.length; i++) { + primaryKeyTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]); + primaryKeySerdes[i] = AqlSerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(primaryKeyTypes[i]); + } + RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits); + IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); + Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())) + .thenReturn(searcgRecDesc); + return rDescProvider; + } + } + + public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) { + int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1)); + ITypeTraits[] primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, + recordType, metaType); + ISerializerDeserializer<?>[] primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, + keyTypes, recordType, metaType); + return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 2f8def1..aca5dc3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -260,7 +260,8 @@ init(); datasetNetworkManager.start(); - IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries); + IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), + ncConfig.retries); this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle); HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()]; for (int i = 0; i < gcInfos.length; ++i) { @@ -270,12 +271,11 @@ // Use "public" versions of network addresses and ports NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress(); NetworkAddress netAddress = netManager.getPublicNetworkAddress(); - ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, - datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean - .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean - .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean - .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema)); + ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), + runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), + runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), + runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema)); synchronized (this) { while (registrationPending) { @@ -594,4 +594,37 @@ } } } + + /* + * Method below are used for unit tests + */ + + public void testStart() throws Exception { + LOGGER.log(Level.INFO, "Starting NodeControllerService"); + startApplication(); + init(); + LOGGER.log(Level.INFO, "Started NodeControllerService"); + } + + public void testStartComplete() throws Exception { + ncAppEntryPoint.notifyStartupComplete(); + } + + public synchronized void testStop() throws Exception { + if (!shuttedDown) { + LOGGER.log(Level.INFO, "Stopping NodeControllerService"); + executor.shutdownNow(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally"); + } + partitionManager.close(); + datasetPartitionManager.close(); + datasetNetworkManager.stop(); + if (ncAppEntryPoint != null) { + ncAppEntryPoint.stop(); + } + LOGGER.log(Level.INFO, "Stopped NodeControllerService"); + shuttedDown = true; + } + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/962 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>