abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/962

Change subject: Add Test NodeController
......................................................................

Add Test NodeController

This test enable creating a node controller for unit test purposes.
The Node controller is identical to the regular node controller
except that it doesn't communicate with a cluster controller at all.

Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243
---
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
2 files changed, 542 insertions(+), 7 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/62/962/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
new file mode 100644
index 0000000..7560675
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.bootstrap;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.asterix.algebra.operators.physical.CommitRuntime;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import 
org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import 
org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import 
org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import 
org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import 
org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.test.support.TestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestNodeController {
+    // Constants
+    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+    public static final int KB32 = 32768;
+    public static final int PARTITION = 0;
+    public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = 
new TransactionSubsystemProvider();
+    // Final
+    private final String nodeName;
+    private final String ioDeviceDirList;
+    // Mutables
+    private JobId jobId;
+    private long jobCounter = 0L;
+    private long resourceCounter = 0;
+    private NodeControllerService nodeControllerService;
+    private IIndexLifecycleManagerProvider ilcmp;
+    private IHyracksJobletContext jobletCtx;
+    private AsterixStorageProperties storageProperties;
+
+    public TestNodeController(String nodeName, String ioDeviceDirList)
+            throws AsterixException, HyracksException, ACIDException {
+        this.nodeName = nodeName;
+        this.ioDeviceDirList = ioDeviceDirList;
+    }
+
+    public void init() throws Exception {
+        String ncName = nodeName;
+        NCConfig ncConfig = new NCConfig();
+        ncConfig.nodeId = ncName;
+        ncConfig.resultSweepThreshold = 1000;
+        ncConfig.appArgs = Arrays.asList("-virtual-NC");
+        ncConfig.ioDevices = ioDeviceDirList;
+        ncConfig.ccHost = "localhost";
+        ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+        ncConfig.clusterNetIPAddress = "127.0.0.1";
+        ncConfig.dataIPAddress = "127.0.0.1";
+        ncConfig.resultIPAddress = "127.0.0.1";
+        ncConfig.resultTTL = 30000;
+        String[] nodeStores = ioDeviceDirList.split(",");
+        for (int p = 0; p < nodeStores.length; p++) {
+            // create IO devices based on stores
+            File ioDeviceDir = new File(nodeStores[p]);
+            ioDeviceDir.mkdirs();
+        }
+        ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
+        nodeControllerService = new NodeControllerService(ncConfig);
+        nodeControllerService.testStart();
+        NCMessageBroker messageBroker = (NCMessageBroker) Mockito
+                
.spy(nodeControllerService.getApplicationContext().getMessageBroker());
+        Mockito.doNothing().when(messageBroker).reportMaxResourceId();
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                IApplicationMessageCallback callback = 
invocation.getArgumentAt(1, IApplicationMessageCallback.class);
+                ResourceIdRequestResponseMessage message = new 
ResourceIdRequestResponseMessage();
+                message.setId(resourceCounter++);
+                callback.deliverMessageResponse(message);
+                return null;
+            }
+        
}).when(messageBroker).sendMessage(Mockito.any(ResourceIdRequestMessage.class), 
Mockito.any());
+        
nodeControllerService.getApplicationContext().setMessageBroker(messageBroker);
+        nodeControllerService.testStartComplete();
+        ilcmp = Mockito.mock(IIndexLifecycleManagerProvider.class);
+        Mockito.when(ilcmp.getLifecycleManager(Mockito.any()))
+                
.thenReturn(getAppRuntimeContext().getDatasetLifecycleManager());
+        getAppRuntimeContext().getLSMIOScheduler();
+        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        
Mockito.when(jobletCtx.getApplicationContext()).thenReturn(nodeControllerService.getApplicationContext());
+        Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
+            @Override
+            public JobId answer(InvocationOnMock invocation) throws Throwable {
+                return jobId;
+            }
+        });
+        storageProperties = getAppRuntimeContext().getStorageProperties();
+    }
+
+    public void deInit() throws Exception {
+        nodeControllerService.testStop();
+    }
+
+    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
+        return new org.apache.asterix.common.transactions.JobId((int) 
jobId.getId());
+    }
+
+    public AsterixLSMInsertDeleteOperatorNodePushable 
getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType 
metaType,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        IndexOperation op = IndexOperation.INSERT;
+        IModificationOperationCallbackFactory modOpCallbackFactory = new 
PrimaryIndexModificationOperationCallbackFactory(
+                getTxnJobId(), dataset.getDatasetId(), 
primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op,
+                ResourceType.LSM_BTREE, true);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = 
getInsertOpratorDesc(primaryIndexInfo,
+                modOpCallbackFactory);
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = 
getPrimaryIndexDataflowHelperFactory(ctx,
+                primaryIndexInfo);
+        
Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
+        IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
+        AsterixLSMInsertDeleteOperatorNodePushable insertOp = new 
AsterixLSMInsertDeleteOperatorNodePushable(
+                indexOpDesc, ctx, PARTITION, 
primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider,
+                op, true);
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), 
dataset.getDatasetId(),
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+        return insertOp;
+    }
+
+    public IPushRuntime getFullScanPipeline(IFrameWriter countOp, 
IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType 
metaType,
+            NoMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields)
+            throws HyracksDataException, AlgebricksException {
+        IPushRuntime emptyTupleOp = new 
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        JobSpecification spec = new JobSpecification();
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory = 
getPrimaryIndexDataflowHelperFactory(ctx,
+                primaryIndexInfo);
+        BTreeSearchOperatorDescriptor searchOpDesc = new 
BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryIndexInfo.fileSplitProvider, 
primaryIndexInfo.primaryIndexTypeTraits,
+                primaryIndexInfo.primaryIndexComparatorFactories, 
primaryIndexInfo.primaryIndexBloomFilterKeyFields,
+                primaryIndexInfo.primaryKeyIndexes, 
primaryIndexInfo.primaryKeyIndexes, true, true,
+                indexDataflowHelperFactory, false, false, null, 
NoOpOperationCallbackFactory.INSTANCE, filterFields,
+                filterFields);
+        BTreeSearchOperatorNodePushable searchOp = new 
BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0,
+                primaryIndexInfo.getSearchRecordDescriptorProvider(), 
/*primaryIndexInfo.primaryKeyIndexes*/null,
+                /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, 
filterFields, filterFields);
+        emptyTupleOp.setFrameWriter(0, searchOp,
+                
primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null,
 0));
+        searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
+        return emptyTupleOp;
+    }
+
+    public LogReader getTransactionLogReader(boolean isRecoveryMode) {
+        return (LogReader) 
getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode);
+    }
+
+    public JobId newJobId() {
+        jobId = new JobId(jobCounter++);
+        return jobId;
+    }
+
+    public AsterixLSMTreeInsertDeleteOperatorDescriptor 
getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
+            IModificationOperationCallbackFactory modOpCallbackFactory) {
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = Mockito
+                .mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
+        
Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(ilcmp);
+        
Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        
Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        
Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        
Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory);
+        return indexOpDesc;
+    }
+
+    public TreeIndexCreateOperatorDescriptor 
getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) {
+        TreeIndexCreateOperatorDescriptor indexOpDesc = 
Mockito.mock(TreeIndexCreateOperatorDescriptor.class);
+        
Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(ilcmp);
+        
Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        
Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        
Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        return indexOpDesc;
+    }
+
+    public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
+        FileSplit fileSplit = new FileSplit(nodeName,
+                dataset.getDataverseName() + File.separator + 
dataset.getDatasetName());
+        return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
+    }
+
+    public ILocalResourceFactoryProvider 
getPrimaryIndexLocalResourceMetadataProvider(Dataset dataset,
+            ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] 
primaryIndexComparatorFactories,
+            int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory 
mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] 
filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, 
int[] filterFields) {
+        ILocalResourceMetadata localResourceMetadata = new 
LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits,
+                primaryIndexComparatorFactories, 
primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(),
+                mergePolicyFactory, mergePolicyProperties, filterTypeTraits, 
filterCmpFactories, btreeFields,
+                filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMBTreeResource);
+        return localResourceFactoryProvider;
+    }
+
+    public LSMBTreeDataflowHelper 
getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo, 
TreeIndexCreateOperatorDescriptor indexOpDesc)
+            throws AlgebricksException {
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = new 
LSMBTreeDataflowHelperFactory(
+                new 
AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, 
primaryIndexInfo.mergePolicyProperties,
+                new 
PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(), true, 
primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, 
primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+        IndexDataflowHelper dataflowHelper = 
dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, 0);
+        return (LSMBTreeDataflowHelper) dataflowHelper;
+    }
+
+    public LSMBTreeDataflowHelperFactory 
getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo) throws AlgebricksException {
+        return new LSMBTreeDataflowHelperFactory(
+                new 
AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, 
primaryIndexInfo.mergePolicyProperties,
+                new 
PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(), true, 
primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, 
primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(Dataset 
dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = 
getIndexCreateOpDesc(primaryIndexInfo);
+        return getPrimaryIndexDataflowHelper(createTestContext(), 
primaryIndexInfo, indexOpDesc);
+    }
+
+    public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, 
ARecordType recordType,
+            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, 
Map<String, String> mergePolicyProperties,
+            int[] filterFields) throws AlgebricksException, 
HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = 
getIndexCreateOpDesc(primaryIndexInfo);
+        LSMBTreeDataflowHelper dataflowHelper = 
getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo,
+                indexOpDesc);
+        dataflowHelper.create();
+    }
+
+    private int[] createPrimaryIndexBloomFilterFields(int length) {
+        int[] primaryIndexBloomFilterKeyFields = new int[length];
+        for (int j = 0; j < length; ++j) {
+            primaryIndexBloomFilterKeyFields[j] = j;
+        }
+        return primaryIndexBloomFilterKeyFields;
+    }
+
+    private IBinaryComparatorFactory[] 
createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
+        IBinaryComparatorFactory[] primaryIndexComparatorFactories = new 
IBinaryComparatorFactory[primaryKeyTypes.length];
+        for (int j = 0; j < primaryKeyTypes.length; ++j) {
+            primaryIndexComparatorFactories[j] = 
AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(primaryKeyTypes[j], true);
+        }
+        return primaryIndexComparatorFactories;
+    }
+
+    private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int 
primaryIndexNumOfTupleFields,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType 
metaType) {
+        int i = 0;
+        ISerializerDeserializer<?>[] primaryIndexSerdes = new 
ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        primaryIndexSerdes[i++] = 
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
+        if (metaType != null) {
+            primaryIndexSerdes[i] = 
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        }
+        return primaryIndexSerdes;
+    }
+
+    private ITypeTraits[] createPrimaryIndexTypeTraits(int 
primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType) {
+        ITypeTraits[] primaryIndexTypeTraits = new 
ITypeTraits[primaryIndexNumOfTupleFields];
+        int i = 0;
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexTypeTraits[i] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        primaryIndexTypeTraits[i++] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(recordType);
+        if (metaType != null) {
+            primaryIndexTypeTraits[i] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaType);
+        }
+        return primaryIndexTypeTraits;
+    }
+
+    public IHyracksTaskContext createTestContext() {
+        IHyracksTaskContext ctx = TestUtils.create(KB32);
+        ctx = Mockito.spy(ctx);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+        
Mockito.when(ctx.getIOManager()).thenReturn(nodeControllerService.getRootContext().getIOManager());
+        return ctx;
+    }
+
+    public TransactionSubsystem getTransactionSubsystem() {
+        return (TransactionSubsystem) ((AsterixAppRuntimeContext) 
nodeControllerService.getApplicationContext()
+                .getApplicationObject()).getTransactionSubsystem();
+    }
+
+    public ITransactionManager getTransactionManager() {
+        return getTransactionSubsystem().getTransactionManager();
+    }
+
+    public AsterixAppRuntimeContext getAppRuntimeContext() {
+        return (AsterixAppRuntimeContext) 
nodeControllerService.getApplicationContext().getApplicationObject();
+    }
+
+    public DatasetLifecycleManager getDatasetLifecycleManager() {
+        return (DatasetLifecycleManager) 
getAppRuntimeContext().getDatasetLifecycleManager();
+    }
+
+    @SuppressWarnings("unused")
+    private class PrimaryIndexInfo {
+        private Dataset dataset;
+        private IAType[] primaryKeyTypes;
+        private ARecordType recordType;
+        private ARecordType metaType;
+        private ILSMMergePolicyFactory mergePolicyFactory;
+        private Map<String, String> mergePolicyProperties;
+        private int[] filterFields;
+        private int primaryIndexNumOfTupleFields;
+        private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
+        private ITypeTraits[] primaryIndexTypeTraits;
+        private ISerializerDeserializer<?>[] primaryIndexSerdes;
+        private int[] primaryIndexBloomFilterKeyFields;
+        private ITypeTraits[] filterTypeTraits;
+        private IBinaryComparatorFactory[] filterCmpFactories;
+        private int[] btreeFields;
+        private ILocalResourceFactoryProvider localResourceFactoryProvider;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] primaryIndexInsertFieldsPermutations;
+        private int[] primaryKeyIndexes;
+
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, 
ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties,
+                int[] filterFields) throws AlgebricksException {
+            this.dataset = dataset;
+            this.primaryKeyTypes = primaryKeyTypes;
+            this.recordType = recordType;
+            this.metaType = metaType;
+            this.mergePolicyFactory = mergePolicyFactory;
+            this.mergePolicyProperties = mergePolicyProperties;
+            this.filterFields = filterFields;
+            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + 
((metaType == null) ? 0 : 1));
+            primaryIndexTypeTraits = 
createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes,
+                    recordType, metaType);
+            primaryIndexComparatorFactories = 
createPrimaryIndexComparatorFactories(primaryKeyTypes);
+            primaryIndexBloomFilterKeyFields = 
createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
+            filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, 
recordType);
+            filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recordType,
+                    
NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
+            btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+            localResourceFactoryProvider = 
getPrimaryIndexLocalResourceMetadataProvider(dataset, primaryIndexTypeTraits,
+                    primaryIndexComparatorFactories, 
primaryIndexBloomFilterKeyFields, mergePolicyFactory,
+                    mergePolicyProperties, filterTypeTraits, 
filterCmpFactories, btreeFields, filterFields);
+            fileSplitProvider = getFileSplitProvider(dataset);
+            primaryIndexSerdes = 
createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, 
recordType,
+                    metaType);
+            rDesc = new RecordDescriptor(primaryIndexSerdes, 
primaryIndexTypeTraits);
+            primaryIndexInsertFieldsPermutations = new 
int[primaryIndexNumOfTupleFields];
+            for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
+                primaryIndexInsertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new int[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i;
+            }
+        }
+
+        public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
+            IRecordDescriptorProvider rDescProvider = 
Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), 
Mockito.anyInt())).thenReturn(rDesc);
+            return rDescProvider;
+        }
+
+        public IRecordDescriptorProvider getSearchRecordDescriptorProvider() {
+            ITypeTraits[] primaryKeyTypeTraits = new 
ITypeTraits[primaryKeyTypes.length];
+            ISerializerDeserializer<?>[] primaryKeySerdes = new 
ISerializerDeserializer<?>[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyTypes.length; i++) {
+                primaryKeyTypeTraits[i] = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+                primaryKeySerdes[i] = 
AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(primaryKeyTypes[i]);
+            }
+            RecordDescriptor searcgRecDesc = new 
RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits);
+            IRecordDescriptorProvider rDescProvider = 
Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), 
Mockito.anyInt()))
+                    .thenReturn(searcgRecDesc);
+            return rDescProvider;
+        }
+    }
+
+    public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType 
recordType, ARecordType metaType) {
+        int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType 
== null) ? 0 : 1));
+        ITypeTraits[] primaryIndexTypeTraits = 
createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes,
+                recordType, metaType);
+        ISerializerDeserializer<?>[] primaryIndexSerdes = 
createPrimaryIndexSerdes(primaryIndexNumOfTupleFields,
+                keyTypes, recordType, metaType);
+        return new RecordDescriptor(primaryIndexSerdes, 
primaryIndexTypeTraits);
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..aca5dc3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,8 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new 
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        IIPCHandle ccIPCHandle = ipc.getHandle(new 
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
+                ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +271,11 @@
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = 
datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), 
osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), 
runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), 
runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -594,4 +594,37 @@
             }
         }
     }
+
+    /*
+     * Method below are used for unit tests
+     */
+
+    public void testStart() throws Exception {
+        LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        startApplication();
+        init();
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+    }
+
+    public void testStartComplete() throws Exception {
+        ncAppEntryPoint.notifyStartupComplete();
+    }
+
+    public synchronized void testStop() throws Exception {
+        if (!shuttedDown) {
+            LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+            executor.shutdownNow();
+            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing 
shutdown abnormally");
+            }
+            partitionManager.close();
+            datasetPartitionManager.close();
+            datasetNetworkManager.stop();
+            if (ncAppEntryPoint != null) {
+                ncAppEntryPoint.stop();
+            }
+            LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+            shuttedDown = true;
+        }
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/962
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to