abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1944

Change subject: Un-Singleton ClusterStateManager
......................................................................

Un-Singleton ClusterStateManager

Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
R 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
36 files changed, 350 insertions(+), 163 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/44/1944/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 7f35d08..4550ba6 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -260,10 +260,10 @@
             keyAccessExpression = null;
             keyAccessScalarFunctionCallExpression = null;
         }
-        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, 
targetDataset, feedOutputType, metaType,
-                pkTypes, partitioningKeys, 
keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
-                FeedRuntimeType.valueOf(subscriptionLocation), 
locations.split(","), context.getComputationNodeDomain(),
-                feedConnection);
+        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) 
context.getMetadataProvider(), sourceFeed,
+                aqlId, targetDataset, feedOutputType, metaType, pkTypes, 
keyAccessScalarFunctionCallExpression,
+                sourceFeed.getFeedId(), 
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
         
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
         return feedDataSource;
     }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 1d47095..c3f01e8 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -53,7 +52,7 @@
     public void validateOperation(ICcApplicationContext appCtx, Dataverse 
defaultDataverse, Statement stmt)
             throws AsterixException {
 
-        final IClusterStateManager clusterStateManager = 
ClusterStateManager.INSTANCE;
+        final IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
         final IGlobalRecoveryManager globalRecoveryManager = 
appCtx.getGlobalRecoveryManager();
         if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE)
                 && globalRecoveryManager.isRecoveryCompleted())) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2799765..71c67f4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -32,10 +32,10 @@
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.hyracks.bootstrap.NCApplication;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.INCApplication;
@@ -116,7 +116,7 @@
             thread.join();
         }
         // Wait until cluster becomes active
-        ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+        ((ICcApplicationContext) 
cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE);
         hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), 
cc.getConfig().getClientListenPort());
         this.ncs = nodeControllers.toArray(new 
NodeControllerService[nodeControllers.size()]);
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 82e8f7a..218a3ea 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -28,8 +28,8 @@
 import java.util.logging.Logger;
 import java.util.regex.Pattern;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.common.config.ConfigUtils;
@@ -60,9 +60,11 @@
     protected static final String VERSION_URI_KEY = "versionUri";
     protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri";
     private final ObjectMapper om = new ObjectMapper();
+    protected final ICcApplicationContext appCtx;
 
-    public ClusterApiServlet(ConcurrentMap<String, Object> ctx, String... 
paths) {
+    public ClusterApiServlet(ICcApplicationContext appCtx, 
ConcurrentMap<String, Object> ctx, String... paths) {
         super(ctx, paths);
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -94,11 +96,11 @@
     }
 
     protected ObjectNode getClusterStateSummaryJSON() {
-        return ClusterStateManager.INSTANCE.getClusterStateSummary();
+        return appCtx.getClusterStateManager().getClusterStateSummary();
     }
 
     protected ObjectNode getClusterStateJSON(IServletRequest request, String 
pathToNode) {
-        ObjectNode json = 
ClusterStateManager.INSTANCE.getClusterStateDescription();
+        ObjectNode json = 
appCtx.getClusterStateManager().getClusterStateDescription();
         CcApplicationContext appConfig = (CcApplicationContext) 
ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR);
         json.putPOJO("config", 
ConfigUtils.getSectionOptionsForJSON(appConfig.getServiceContext().getAppConfig(),
                 Section.COMMON, getConfigSelector()));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 52d4d67..848e1f1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -26,6 +26,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -41,8 +42,9 @@
     private static final Logger LOGGER = 
Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName());
     private final ObjectMapper om = new ObjectMapper();
 
-    public ClusterControllerDetailsApiServlet(ConcurrentMap<String, Object> 
ctx, String... paths) {
-        super(ctx, paths);
+    public ClusterControllerDetailsApiServlet(ICcApplicationContext appCtx, 
ConcurrentMap<String, Object> ctx,
+            String... paths) {
+        super(appCtx, ctx, paths);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index dcd0e70..6e62c95 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -49,11 +49,9 @@
 
 public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
     private static final Logger LOGGER = 
Logger.getLogger(DiagnosticsApiServlet.class.getName());
-    private final ICcApplicationContext appCtx;
 
     public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
-        super(ctx, paths);
-        this.appCtx = appCtx;
+        super(appCtx, ctx, paths);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index d9757c7..d08204c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -29,7 +29,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -48,8 +50,9 @@
     private static final Logger LOGGER = 
Logger.getLogger(NodeControllerDetailsApiServlet.class.getName());
     private final ObjectMapper om = new ObjectMapper();
 
-    public NodeControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, 
String... paths) {
-        super(ctx, paths);
+    public NodeControllerDetailsApiServlet(ICcApplicationContext appCtx, 
ConcurrentMap<String, Object> ctx,
+            String... paths) {
+        super(appCtx, ctx, paths);
         om.enable(SerializationFeature.INDENT_OUTPUT);
     }
 
@@ -204,8 +207,9 @@
         String dump = hcc.getThreadDump(node);
         if (dump == null) {
             // check to see if this is a node that is simply down
-            throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null 
? new IllegalStateException()
-                    : new IllegalArgumentException();
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            ClusterPartition[] cp = csm.getNodePartitions(node);
+            throw cp != null ? new IllegalStateException() : new 
IllegalArgumentException();
         }
         return (ObjectNode) om.readTree(dump);
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1cec616..08f520a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -39,7 +39,6 @@
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -62,6 +61,7 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class QueryServiceServlet extends AbstractQueryApiServlet {
@@ -445,7 +445,7 @@
     protected void executeStatement(String statementsText, SessionOutput 
sessionOutput, ResultDelivery delivery,
             IStatementExecutor.Stats stats, RequestParameters param, String 
handleUrl, long[] outExecStartEnd)
             throws Exception {
-        IClusterManagementWork.ClusterState clusterState = 
ClusterStateManager.INSTANCE.getState();
+        IClusterManagementWork.ClusterState clusterState = 
appCtx.getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
             // using a plain IllegalStateException here to get into the right 
catch clause for a 500
             throw new IllegalStateException("Cannot execute request, cluster 
is " + clusterState);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index fdd106d..f940145 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -27,8 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -48,14 +49,17 @@
     public static final String NCSERVICE_PID = "ncservice_pid";
     public static final String INI = "ini";
     public static final String PID = "pid";
+    private final IApplicationContext appCtx;
 
-    public ShutdownApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths) {
+    public ShutdownApiServlet(IApplicationContext appCtx, 
ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
+        this.appCtx = appCtx;
     }
 
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
         IHyracksClientConnection hcc = (IHyracksClientConnection) 
ctx.get(HYRACKS_CONNECTION_ATTR);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         boolean terminateNCServices = 
"true".equalsIgnoreCase(request.getParameter("all"));
         Thread t = new Thread(() -> {
             try {
@@ -78,7 +82,7 @@
         try {
             jsonObject.put("status", "SHUTTING_DOWN");
             jsonObject.put("date", new Date().toString());
-            ObjectNode clusterState = 
ClusterStateManager.INSTANCE.getClusterStateDescription();
+            ObjectNode clusterState = csm.getClusterStateDescription();
             ArrayNode ncs = (ArrayNode) clusterState.get("ncs");
             for (int i = 0; i < ncs.size(); i++) {
                 ObjectNode nc = (ObjectNode) ncs.get(i);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..90b9f46 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -28,7 +28,9 @@
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -40,7 +42,6 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -149,7 +150,9 @@
         if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == 
null) {
             return "Node is not registerted with the CC";
         }
-        final IClusterManagementWork.ClusterState clusterState = 
ClusterStateManager.INSTANCE.getState();
+        IApplicationContext appCtx = (IApplicationContext) 
ccSrv.getApplicationContext();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        final IClusterManagementWork.ClusterState clusterState = 
csm.getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
             return "Cannot execute request, cluster is " + clusterState;
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7647881..d52ce84 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
@@ -73,6 +74,7 @@
 import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -136,6 +138,8 @@
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
 
+    private final ClusterStateManager clusterStateManager;
+
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, 
IllegalAccessException, ClassNotFoundException,
             IOException {
@@ -160,6 +164,7 @@
         ncExtensionManager = new NCExtensionManager(allExtensions);
         componentProvider = new StorageComponentProvider();
         resourceIdFactory = new 
GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
+        clusterStateManager = new ClusterStateManager();
     }
 
     @Override
@@ -482,4 +487,9 @@
     public INCServiceContext getServiceContext() {
         return ncServiceContext;
     }
+
+    @Override
+    public IClusterStateManager getClusterStateManager() {
+        return clusterStateManager;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b97c014..e966d3d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -56,6 +56,7 @@
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -152,7 +153,6 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
@@ -706,7 +706,8 @@
 
     protected static String configureNodegroupForDataset(ICcApplicationContext 
appCtx, Map<String, String> hints,
             String dataverseName, String datasetName, MetadataProvider 
metadataProvider) throws Exception {
-        Set<String> allNodes = 
ClusterStateManager.INSTANCE.getParticipantNodes(true);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        Set<String> allNodes = csm.getParticipantNodes(true);
         Set<String> selectedNodes = new LinkedHashSet<>();
         String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
         if (hintValue == null) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..c26156d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -53,7 +53,6 @@
 import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
-import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -78,7 +77,6 @@
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -125,7 +123,7 @@
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
         ILibraryManager libraryManager = new ExternalLibraryManager();
-        ResourceIdManager resourceIdManager = new ResourceIdManager();
+
         IReplicationStrategy repStrategy = 
ClusterProperties.INSTANCE.getReplicationStrategy();
         IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
                 .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, 
ccServiceCtx);
@@ -133,10 +131,9 @@
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager globalRecoveryManager = 
createGlobalRecoveryManager();
         statementExecutorCtx = new StatementExecutorContext();
-        appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, resourceIdManager,
-                () -> MetadataManager.INSTANCE, globalRecoveryManager, 
ftStrategy, new ActiveNotificationHandler(),
-                componentProvider, new MetadataLockManager());
-        ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
+        appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, () -> MetadataManager.INSTANCE,
+                globalRecoveryManager, ftStrategy, new 
ActiveNotificationHandler(), componentProvider,
+                new MetadataLockManager());
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
@@ -302,15 +299,15 @@
             case Servlets.REBALANCE:
                 return new RebalanceApiServlet(ctx, paths, appCtx);
             case Servlets.SHUTDOWN:
-                return new ShutdownApiServlet(ctx, paths);
+                return new ShutdownApiServlet(appCtx, ctx, paths);
             case Servlets.VERSION:
                 return new VersionApiServlet(ctx, paths);
             case Servlets.CLUSTER_STATE:
-                return new ClusterApiServlet(ctx, paths);
+                return new ClusterApiServlet(appCtx, ctx, paths);
             case Servlets.CLUSTER_STATE_NODE_DETAIL:
-                return new NodeControllerDetailsApiServlet(ctx, paths);
+                return new NodeControllerDetailsApiServlet(appCtx, ctx, paths);
             case Servlets.CLUSTER_STATE_CC_DETAIL:
-                return new ClusterControllerDetailsApiServlet(ctx, paths);
+                return new ClusterControllerDetailsApiServlet(appCtx, ctx, 
paths);
             case Servlets.DIAGNOSTICS:
                 return new DiagnosticsApiServlet(ctx, paths, appCtx);
             case Servlets.ACTIVE_STATS:
@@ -331,7 +328,7 @@
     @Override
     public void startupCompleted() throws Exception {
         ccServiceCtx.getControllerService().getExecutor().submit(() -> {
-            ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+            appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE);
             
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
             return null;
         });
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 66f76c5..0583508 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -42,7 +43,6 @@
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -70,10 +70,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("NC: " + nodeId + " joined");
         }
-        ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, 
ncConfiguration);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        csm.addNCConfiguration(nodeId, ncConfiguration);
 
         //if metadata node rejoining, we need to rebind the proxy connection 
when it is active again.
-        if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+        if (!csm.isMetadataNodeActive()) {
             MetadataManager.INSTANCE.rebindMetadataNode();
         }
 
@@ -99,10 +100,11 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode);
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            csm.removeNCConfiguration(deadNode);
 
             //if metadata node failed, we need to rebind the proxy connection 
when it is active again
-            if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+            if (!csm.isMetadataNodeActive()) {
                 MetadataManager.INSTANCE.rebindMetadataNode();
             }
         }
@@ -171,8 +173,9 @@
 
         List<String> addedNodes = new ArrayList<>();
         String asterixInstanceName = 
ClusterProperties.INSTANCE.getCluster().getInstanceName();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         for (int i = 0; i < nodesToAdd; i++) {
-            Node node = 
ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+            Node node = csm.getAvailableSubstitutionNode();
             if (node != null) {
                 try {
                     ClusterManagerProvider.getClusterManager().addNode(appCtx, 
node);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 46968b4..2977a58 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -25,13 +25,13 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ClusterWorkExecutor implements Runnable {
 
@@ -69,9 +69,10 @@
                     }
                 }
 
+                IClusterStateManager csm = appCtx.getClusterStateManager();
                 Set<Node> addedNodes = new HashSet<>();
                 for (int i = 0; i < nodesToAdd; i++) {
-                    Node node = 
ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+                    Node node = csm.getAvailableSubstitutionNode();
                     if (node != null) {
                         try {
                             
ClusterManagerProvider.getClusterManager().addNode(appCtx, node);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 9fc9940..8dc1b3e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
@@ -66,7 +67,6 @@
 import org.apache.asterix.metadata.feeds.LocationConstraint;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
@@ -134,8 +134,10 @@
 
     public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider 
metadataProvider, Feed feed)
             throws AsterixException {
-        JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        AlgebricksAbsolutePartitionConstraint allCluster = 
ClusterStateManager.INSTANCE.getClusterLocations();
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        AlgebricksAbsolutePartitionConstraint allCluster = 
csm.getClusterLocations();
         Set<String> nodes = new TreeSet<>();
         for (String node : allCluster.getLocations()) {
             nodes.add(node);
@@ -143,7 +145,7 @@
         AlgebricksAbsolutePartitionConstraint locations =
                 new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new 
String[nodes.size()]));
         FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(feed.getDataverseName(), 
feed.getFeedName(), locations);
+                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), 
feed.getFeedName(), locations);
         org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, 
AlgebricksPartitionConstraint> spC =
                 
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
         FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(spec, spC.first, true);
@@ -273,9 +275,8 @@
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId,
-                       Pair<Pair<IOperatorDescriptor, Integer>, 
Pair<IOperatorDescriptor, Integer>>> entry
-                       : subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+                    .getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = 
connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = 
jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = 
entry.getValue().getLeft();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1421c5..53a4f23 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -405,7 +405,10 @@
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, 
keyFieldTypes, false, false, true,
                     MetadataUtil.PENDING_NO_OP);
             List<String> nodes = 
Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
-            FileSplit[] splits = 
SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes);
+            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
+                    ((ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
+                            .getClusterStateManager(),
+                    dataset, index.getIndexName(), nodes);
             fileSplitProvider = new 
ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
         }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 0aea84d..90dab96 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.api;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -60,4 +61,9 @@
 
     IServiceContext getServiceContext();
 
+    /**
+     * @return the cluster state manager
+     */
+    IClusterStateManager getClusterStateManager();
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 30675cd..b368c3b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -19,12 +19,19 @@
 package org.apache.asterix.common.cluster;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.cluster.Node;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public interface IClusterStateManager {
 
@@ -49,6 +56,7 @@
 
     /**
      * Updates all partitions of {@code nodeId} based on the {@code active} 
flag.
+     *
      * @param nodeId
      * @param active
      * @throws HyracksDataException
@@ -93,6 +101,7 @@
 
     /**
      * Blocks until the cluster state becomes {@code state}, or timeout is 
exhausted.
+     *
      * @return true if the desired state was reached before timeout occurred
      */
     boolean waitForState(ClusterState waitForState, long timeout, TimeUnit 
unit)
@@ -116,4 +125,106 @@
      * @throws HyracksDataException
      */
     void deregisterNodePartitions(String nodeId) throws HyracksDataException;
+
+    /**
+     * @return true if cluster is active, false otherwise
+     */
+    boolean isClusterActive();
+
+    /**
+     * @return the set of participant nodes
+     */
+    Set<String> getParticipantNodes();
+
+    /**
+     * Returns the IO devices configured for a Node Controller
+     *
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return a list of IO devices.
+     */
+    String[] getIODevices(String nodeId);
+
+    /**
+     * @return the constraint representing all the partitions of the cluster
+     */
+    AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
+    /**
+     * @param excludePendingRemoval
+     *            true, if the desired set shouldn't have pending removal nodes
+     * @return the set of participant nodes
+     */
+    Set<String> getParticipantNodes(boolean excludePendingRemoval);
+
+    /**
+     * @param node
+     *            the node id
+     * @return the number of partitions on that node
+     */
+    int getNodePartitionsCount(String node);
+
+    /**
+     * @return a json object representing the cluster state summary
+     */
+    ObjectNode getClusterStateSummary();
+
+    /**
+     * @return a json object representing the cluster state description
+     */
+    ObjectNode getClusterStateDescription();
+
+    /**
+     * Set the cc application context
+     *
+     * @param appCtx
+     */
+    void setCcAppCtx(ICcApplicationContext appCtx);
+
+    /**
+     * @return the number of cluster nodes
+     */
+    int getNumberOfNodes();
+
+    /**
+     * Add node configuration
+     *
+     * @param nodeId
+     * @param ncConfiguration
+     * @throws HyracksException
+     */
+    void addNCConfiguration(String nodeId, Map<IOption, Object> 
ncConfiguration) throws HyracksException;
+
+    /**
+     * @return true if metadata node is active, false otherwise
+     */
+    boolean isMetadataNodeActive();
+
+    /**
+     * Remove configuration of a dead node
+     *
+     * @param deadNode
+     * @throws HyracksException
+     */
+    void removeNCConfiguration(String deadNode) throws HyracksException;
+
+    /**
+     * @return a substitution node or null
+     */
+    Node getAvailableSubstitutionNode();
+
+    /**
+     * Add node to the list of nodes pending removal
+     *
+     * @param nodeId
+     */
+    void removePending(String nodeId);
+
+    /**
+     * Deregister intention to remove node id
+     *
+     * @param nodeId
+     * @return
+     */
+    boolean cancelRemovePending(String nodeId);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 0abb92f..b258a17 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -43,6 +43,8 @@
     private Cluster cluster;
 
     private ClusterProperties() {
+        Exception creation = new Exception("ClusterProperties is getting 
created");
+        creation.printStackTrace();
         InputStream is = 
this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
         if (is != null) {
             try {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3eff214..64de250 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,9 +20,9 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IMetadataLockManager;
-import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -99,7 +99,7 @@
     IMetadataLockManager getMetadataLockManager();
 
     /**
-     * @return the cluster state manager
+     * @return the metadata bootstrap
      */
-    IClusterStateManager getClusterStateManager();
+    IMetadataBootstrap getMetadataBootstrap();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2eb81d4..0a47788 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,6 +25,7 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
@@ -92,8 +93,7 @@
     public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext 
ctx, int partition)
             throws HyracksDataException {
         INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
-        INcApplicationContext appCtx =
-                (INcApplicationContext) serviceCtx.getApplicationContext();
+        INcApplicationContext appCtx = (INcApplicationContext) 
serviceCtx.getApplicationContext();
         try {
             restoreExternalObjects(serviceCtx, appCtx.getLibraryManager());
         } catch (Exception e) {
@@ -152,15 +152,16 @@
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
         
ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, 
dataParserFactory);
-        configureFeedLogManager();
+        configureFeedLogManager(appCtx);
         nullifyExternalObjects();
     }
 
-    private void configureFeedLogManager() throws HyracksDataException, 
AlgebricksException {
+    private void configureFeedLogManager(IApplicationContext appCtx) throws 
HyracksDataException, AlgebricksException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
-            feedLogFileSplits = 
FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                    ExternalDataUtils.getFeedName(configuration), 
dataSourceFactory.getPartitionConstraint());
+            feedLogFileSplits = 
FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx,
+                    ExternalDataUtils.getDataverse(configuration), 
ExternalDataUtils.getFeedName(configuration),
+                    dataSourceFactory.getPartitionConstraint());
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index edda448..7cfbf51 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -25,8 +25,9 @@
 import java.util.Set;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -87,6 +88,7 @@
     public static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint constraints, int count) 
throws AlgebricksException {
         if (constraints == null) {
+            IClusterStateManager clusterStateManager = 
((ICcApplicationContext) appCtx).getClusterStateManager();
             ArrayList<String> locs = new ArrayList<>();
             Set<String> stores = 
appCtx.getMetadataProperties().getStores().keySet();
             if (stores.isEmpty()) {
@@ -97,7 +99,7 @@
                 Iterator<String> storeIt = stores.iterator();
                 while (storeIt.hasNext()) {
                     String node = storeIt.next();
-                    int numIODevices = 
ClusterStateManager.INSTANCE.getIODevices(node).length;
+                    int numIODevices = 
clusterStateManager.getIODevices(node).length;
                     for (int k = 0; k < numIODevices; k++) {
                         locs.add(node);
                         i++;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 12be449..c7b8633 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -23,10 +23,10 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -54,6 +54,7 @@
     private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
 
     private Map<String, String> configuration;
+    private transient IServiceContext serviceCtx;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -67,10 +68,10 @@
         if (ingestionCardinalityParam != null) {
             count = Integer.parseInt(ingestionCardinalityParam);
         }
-
+        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
         List<String> chosenLocations = new ArrayList<>();
         String[] availableLocations = locations != null ? locations
-                : 
ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
+                : 
appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {});
         for (int i = 0, k = 0; i < count; i++, k = (k + 1) % 
availableLocations.length) {
             chosenLocations.add(availableLocations[k]);
         }
@@ -84,6 +85,7 @@
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration) {
+        this.serviceCtx = serviceCtx;
         this.configuration = configuration;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index ec7de91..dad0d51 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -26,9 +26,9 @@
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
@@ -63,9 +63,9 @@
     }
 
     public enum Mode {
-        PROCESS,            // There is memory
-        SPILL,              // Memory budget has been consumed. Now we're 
writing to disk
-        DISCARD             // Memory and Disk space budgets have been 
consumed. Now we're discarding
+        PROCESS, // There is memory
+        SPILL, // Memory budget has been consumed. Now we're writing to disk
+        DISCARD // Memory and Disk space budgets have been consumed. Now we're 
discarding
     }
 
     private FeedUtils() {
@@ -87,7 +87,7 @@
         return StoragePathUtil.getFileSplitForClusterPartition(partition, 
f.getPath());
     }
 
-    public static FileSplit[] splitsForAdapter(String dataverseName, String 
feedName,
+    public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, 
String dataverseName, String feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws 
AsterixException {
         if (partitionConstraints.getPartitionConstraintType() == 
PartitionConstraintType.COUNT) {
             throw new AsterixException("Can't create file splits for adapter 
with count partitioning constraints");
@@ -96,7 +96,7 @@
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
             splits.add(splitsForAdapter(dataverseName, feedName, nd,
-                    ClusterStateManager.INSTANCE.getNodePartitions(nd)[0]));
+                    appCtx.getClusterStateManager().getNodePartitions(nd)[0]));
         }
         return splits.toArray(new FileSplit[] {});
     }
@@ -113,8 +113,8 @@
 
     public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, 
FileSplit feedLogFileSplit)
             throws HyracksDataException {
-        return new 
FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(),
-                0, ctx.getIoManager()).getFile());
+        return new FeedLogManager(
+                FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, 
ctx.getIoManager()).getFile());
     }
 
     public static void processFeedMessage(ByteBuffer input, VSizeFrame 
message, FrameTupleAccessor fta)
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index b4353e7..bd50352 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -25,7 +25,9 @@
 import java.util.Map;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -33,7 +35,6 @@
 import org.apache.asterix.external.indexing.RecordId.RecordIdType;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.hivecompat.io.RCFileInputFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -207,10 +208,12 @@
     public static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
+            IClusterStateManager clusterStateManager = 
((ICcApplicationContext) appCtx).getClusterStateManager();
             ArrayList<String> locs = new ArrayList<>();
             Map<String, String[]> stores = 
appCtx.getMetadataProperties().getStores();
             for (String node : stores.keySet()) {
-                int numIODevices = 
ClusterStateManager.INSTANCE.getIODevices(node).length;
+
+                int numIODevices = 
clusterStateManager.getIODevices(node).length;
                 for (int k = 0; k < numIODevices; k++) {
                     locs.add(node);
                 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index c45941d..c09b9eb 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -23,9 +23,9 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -43,17 +43,23 @@
     private int upsertCycle = 0;
     private int numOfReaders;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private transient IServiceContext serviceCtx;
     private static final List<String> recordReaderNames = 
Collections.unmodifiableList(Arrays.asList());
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        clusterLocations = ClusterStateManager.INSTANCE.getClusterLocations();
-        numOfReaders = clusterLocations.getLocations().length;
+        if (clusterLocations == null) {
+            ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+            clusterLocations = 
appCtx.getClusterStateManager().getClusterLocations();
+            numOfReaders = clusterLocations.getLocations().length;
+        }
         return clusterLocations;
+
     }
 
     @Override
     public void configure(IServiceContext serviceCtx, final Map<String, 
String> configuration) {
+        this.serviceCtx = serviceCtx;
         if (configuration.containsKey("num-of-records")) {
             numOfRecords = 
Integer.parseInt(configuration.get("num-of-records"));
         }
@@ -83,7 +89,8 @@
         return DCPRequest.class;
     }
 
-    @Override public List<String> getRecordReaderNames() {
+    @Override
+    public List<String> getRecordReaderNames() {
         return recordReaderNames;
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index 709f655..0f97194 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -21,8 +21,8 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 /**
@@ -113,7 +113,8 @@
                 if (intValue < 0) {
                     return new Pair<>(false, "Value must be >= 0");
                 }
-                int numNodesInCluster = 
ClusterStateManager.INSTANCE.getParticipantNodes(true).size();
+                IClusterStateManager csm = appCtx.getClusterStateManager();
+                int numNodesInCluster = csm.getParticipantNodes(true).size();
                 if (numNodesInCluster < intValue) {
                     return new Pair<>(false,
                             "Value must be less than or equal to the available 
number of nodes in cluster ("
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 26cec1e..0b6608c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -22,7 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
@@ -34,7 +34,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -63,12 +62,12 @@
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
     private final FeedConnection feedConnection;
 
-    public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, 
IAType itemType, IAType metaType,
-            List<IAType> pkTypes, List<List<String>> partitioningKeys,
-            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId 
sourceFeedId,
-            FeedRuntimeType location, String[] locations, INodeDomain domain, 
FeedConnection feedConnection)
-            throws AlgebricksException {
+    public FeedDataSource(MetadataProvider metadataProvider, Feed feed, 
DataSourceId id, String targetDataset,
+            IAType itemType, IAType metaType, List<IAType> pkTypes,
+            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId 
sourceFeedId, FeedRuntimeType location,
+            String[] locations, INodeDomain domain, FeedConnection 
feedConnection) throws AlgebricksException {
         super(id, itemType, metaType, Type.FEED, domain);
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
         this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
@@ -76,7 +75,7 @@
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
-        this.computeCardinality = 
ClusterStateManager.INSTANCE.getParticipantNodes().size();
+        this.computeCardinality = 
appCtx.getClusterStateManager().getParticipantNodes().size();
         this.feedConnection = feedConnection;
         initFeedDataSource();
     }
@@ -170,8 +169,8 @@
             throws AlgebricksException {
         try {
             ARecordType feedOutputType = (ARecordType) itemType;
-            ISerializerDeserializer payloadSerde = 
NonTaggedDataFormat.INSTANCE.getSerdeProvider()
-                    .getSerializerDeserializer(feedOutputType);
+            ISerializerDeserializer payloadSerde =
+                    
NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType);
             ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
             serdes.add(payloadSerde);
             if (metaItemType != null) {
@@ -182,16 +181,16 @@
                     
serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
                 }
             }
-            RecordDescriptor feedDesc = new RecordDescriptor(
-                    serdes.toArray(new 
ISerializerDeserializer[serdes.size()]));
-            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties()
-                    .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+            RecordDescriptor feedDesc =
+                    new RecordDescriptor(serdes.toArray(new 
ISerializerDeserializer[serdes.size()]));
+            FeedPolicyEntity feedPolicy =
+                    (FeedPolicyEntity) 
getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
             if (feedPolicy == null) {
                 throw new AlgebricksException("Feed not configured with a 
policy");
             }
             
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy.getPolicyName());
-            FeedConnectionId feedConnectionId = new 
FeedConnectionId(getId().getDataverseName(),
-                    getId().getDatasourceName(), getTargetDataset());
+            FeedConnectionId feedConnectionId =
+                    new FeedConnectionId(getId().getDataverseName(), 
getId().getDatasourceName(), getTargetDataset());
             FeedCollectOperatorDescriptor feedCollector = new 
FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
                     feedOutputType, feedDesc, feedPolicy.getProperties(), 
getLocation());
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index c23755d..97c6ed2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -36,7 +37,6 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -111,12 +111,12 @@
         return dataset;
     }
 
-    public static INodeDomain findNodeDomain(MetadataTransactionContext 
mdTxnCtx, String nodeGroupName)
-            throws AlgebricksException {
+    public static INodeDomain findNodeDomain(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, String nodeGroupName) throws 
AlgebricksException {
         NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
nodeGroupName);
         List<String> partitions = new ArrayList<>();
         for (String location : nodeGroup.getNodeNames()) {
-            int numPartitions = 
ClusterStateManager.INSTANCE.getNodePartitionsCount(location);
+            int numPartitions = 
clusterStateManager.getNodePartitionsCount(location);
             for (int i = 0; i < numPartitions; i++) {
                 partitions.add(location);
             }
@@ -165,24 +165,24 @@
         }
     }
 
-    public static DataSource findDataSource(MetadataTransactionContext 
mdTxnCtx, DataSourceId id)
-            throws AlgebricksException {
+    public static DataSource findDataSource(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, DataSourceId id) throws 
AlgebricksException {
         try {
-            return lookupSourceInMetadata(mdTxnCtx, id);
+            return lookupSourceInMetadata(clusterStateManager, mdTxnCtx, id);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    public static DataSource lookupSourceInMetadata(MetadataTransactionContext 
mdTxnCtx, DataSourceId aqlId)
-            throws AlgebricksException {
+    public static DataSource lookupSourceInMetadata(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) throws 
AlgebricksException {
         Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), 
aqlId.getDatasourceName());
         if (dataset == null) {
             throw new AlgebricksException("Datasource with id " + aqlId + " 
was not found.");
         }
         IAType itemType = findType(mdTxnCtx, 
dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         IAType metaItemType = findType(mdTxnCtx, 
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-        INodeDomain domain = findNodeDomain(mdTxnCtx, 
dataset.getNodeGroupName());
+        INodeDomain domain = findNodeDomain(clusterStateManager, mdTxnCtx, 
dataset.getNodeGroupName());
         byte datasourceType = 
dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? 
DataSource.Type.EXTERNAL_DATASET
                 : DataSource.Type.INTERNAL_DATASET;
         return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, 
datasourceType,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8971a90..d6a3f21 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -84,7 +85,6 @@
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -295,7 +295,7 @@
     }
 
     public INodeDomain findNodeDomain(String nodeGroupName) throws 
AlgebricksException {
-        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+        return 
MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, 
nodeGroupName);
     }
 
     public List<String> findNodes(String nodeGroupName) throws 
AlgebricksException {
@@ -329,11 +329,11 @@
 
     @Override
     public DataSource findDataSource(DataSourceId id) throws 
AlgebricksException {
-        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+        return 
MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, 
id);
     }
 
     public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws 
AlgebricksException {
-        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+        return 
MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), 
mdTxnCtx, aqlId);
     }
 
     @Override
@@ -709,8 +709,9 @@
         int numPartitions = 0;
         List<String> nodeGroup =
                 MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         for (String nd : nodeGroup) {
-            numPartitions += 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
+            numPartitions += csm.getNodePartitionsCount(nd);
         }
         return numElementsHint / numPartitions;
     }
@@ -755,12 +756,13 @@
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitAndConstraints(String dataverse) {
-        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
+        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
+                dataverse);
     }
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
             throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx);
+        return 
SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), 
dataset, indexName, mdTxnCtx);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
@@ -777,7 +779,7 @@
     }
 
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
-        return ClusterStateManager.INSTANCE.getClusterLocations();
+        return appCtx.getClusterStateManager().getClusterLocations();
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 6825f10..5b7ea59 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -30,7 +31,6 @@
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -42,11 +42,11 @@
     private SplitsAndConstraintsUtil() {
     }
 
-    private static FileSplit[] getDataverseSplits(String dataverseName) {
+    private static FileSplit[] getDataverseSplits(IClusterStateManager 
clusterStateManager, String dataverseName) {
         File relPathFile = new File(dataverseName);
         List<FileSplit> splits = new ArrayList<>();
         // get all partitions
-        ClusterPartition[] clusterPartition = 
ClusterStateManager.INSTANCE.getClusterPartitons();
+        ClusterPartition[] clusterPartition = 
clusterStateManager.getClusterPartitons();
         String storageDirName = 
ClusterProperties.INSTANCE.getStorageDirectoryName();
         for (int j = 0; j < clusterPartition.length; j++) {
             File f = new File(
@@ -57,28 +57,29 @@
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException {
+    public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,
+            String indexName, MetadataTransactionContext mdTxnCtx) throws 
AlgebricksException {
         try {
             NodeGroup nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName());
             if (nodeGroup == null) {
                 throw new AlgebricksException("Couldn't find node group " + 
dataset.getNodeGroupName());
             }
             List<String> nodeList = nodeGroup.getNodeNames();
-            return getIndexSplits(dataset, indexName, nodeList);
+            return getIndexSplits(clusterStateManager, dataset, indexName, 
nodeList);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, List<String> nodes) {
+    public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,
+            String indexName, List<String> nodes) {
         File relPathFile = new 
File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
                 dataset.getDatasetName(), indexName, 
dataset.getRebalanceCount()));
         String storageDirName = 
ClusterProperties.INSTANCE.getStorageDirectoryName();
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : nodes) {
-            int numPartitions = 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
-            ClusterPartition[] nodePartitions = 
ClusterStateManager.INSTANCE.getNodePartitions(nd);
+            int numPartitions = clusterStateManager.getNodePartitionsCount(nd);
+            ClusterPartition[] nodePartitions = 
clusterStateManager.getNodePartitions(nd);
             // currently this case is never executed since the metadata group 
doesn't exists
             if 
(dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME)
 == 0) {
                 numPartitions = 1;
@@ -97,8 +98,8 @@
     }
 
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getDataverseSplitProviderAndConstraints(
-            String dataverse) {
-        FileSplit[] splits = getDataverseSplits(dataverse);
+            IClusterStateManager clusterStateManager, String dataverse) {
+        FileSplit[] splits = getDataverseSplits(clusterStateManager, 
dataverse);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 194fd59..82a1177 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,11 +20,11 @@
 
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestMessage implements ICcAddressedMessage {
@@ -40,7 +40,8 @@
         try {
             ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
             ResourceIdRequestResponseMessage reponse = new 
ResourceIdRequestResponseMessage();
-            if (!ClusterStateManager.INSTANCE.isClusterActive()) {
+            IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
+            if (!clusterStateManager.isClusterActive()) {
                 reponse.setResourceId(-1);
                 reponse.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
             } else {
@@ -49,7 +50,7 @@
                 if (reponse.getResourceId() < 0) {
                     reponse.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
-                requestMaxResourceID(resourceIdManager, broker);
+                requestMaxResourceID(clusterStateManager, resourceIdManager, 
broker);
             }
             broker.sendApplicationMessageToNC(reponse, src);
         } catch (Exception e) {
@@ -57,8 +58,9 @@
         }
     }
 
-    private void requestMaxResourceID(IResourceIdManager resourceIdManager, 
ICCMessageBroker broker) throws Exception {
-        Set<String> getParticipantNodes = 
ClusterStateManager.INSTANCE.getParticipantNodes();
+    private void requestMaxResourceID(IClusterStateManager 
clusterStateManager, IResourceIdManager resourceIdManager,
+            ICCMessageBroker broker) throws Exception {
+        Set<String> getParticipantNodes = 
clusterStateManager.getParticipantNodes();
         ReportMaxResourceIdRequestMessage msg = new 
ReportMaxResourceIdRequestMessage();
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
similarity index 84%
rename from 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 372404c..6a5ed08 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.app.cc;
+package org.apache.asterix.runtime.transaction;
 
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ResourceIdManager implements IResourceIdManager {
 
+    private final IClusterStateManager csm;
     private final AtomicLong globalResourceId = new AtomicLong();
     private volatile Set<String> reportedNodes = new HashSet<>();
     private volatile boolean allReported = false;
+
+    public ResourceIdManager(IClusterStateManager csm) {
+        this.csm = csm;
+    }
 
     @Override
     public long createResourceId() {
         if (!allReported) {
             synchronized (this) {
                 if (!allReported) {
-                    if (reportedNodes.size() < 
ClusterStateManager.INSTANCE.getNumberOfNodes()) {
+                    if (reportedNodes.size() < csm.getNumberOfNodes()) {
                         return -1;
                     } else {
                         reportedNodes = null;
@@ -58,7 +63,7 @@
         if (!allReported) {
             globalResourceId.set(Math.max(maxResourceId, 
globalResourceId.get()));
             reportedNodes.add(nodeId);
-            if (reportedNodes.size() == 
ClusterStateManager.INSTANCE.getNumberOfNodes()) {
+            if (reportedNodes.size() == csm.getNumberOfNodes()) {
                 reportedNodes = null;
                 allReported = true;
             }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 28c480f..e4cc7f4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -43,6 +43,7 @@
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -77,17 +78,16 @@
     private IFaultToleranceStrategy ftStrategy;
     private IJobLifecycleListener activeLifeCycleListener;
     private IMetadataLockManager mdLockManager;
+    private IClusterStateManager clusterStateManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
IHyracksClientConnection hcc,
-            ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
-            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
-            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener,
-            IStorageComponentProvider storageComponentProvider, 
IMetadataLockManager mdLockManager)
-            throws AsterixException, IOException {
+            ILibraryManager libraryManager, Supplier<IMetadataBootstrap> 
metadataBootstrapSupplier,
+            IGlobalRecoveryManager globalRecoveryManager, 
IFaultToleranceStrategy ftStrategy,
+            IJobLifecycleListener activeLifeCycleListener, 
IStorageComponentProvider storageComponentProvider,
+            IMetadataLockManager mdLockManager) throws AsterixException, 
IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
-        this.resourceIdManager = resourceIdManager;
         this.activeLifeCycleListener = activeLifeCycleListener;
         // Determine whether to use old-style asterix-configuration.xml or 
new-style configuration.
         // QQQ strip this out eventually
@@ -109,6 +109,9 @@
         this.globalRecoveryManager = globalRecoveryManager;
         this.storageComponentProvider = storageComponentProvider;
         this.mdLockManager = mdLockManager;
+        clusterStateManager = new ClusterStateManager();
+        clusterStateManager.setCcAppCtx(this);
+        this.resourceIdManager = new ResourceIdManager(clusterStateManager);
     }
 
     @Override
@@ -204,6 +207,7 @@
         return resourceIdManager;
     }
 
+    @Override
     public IMetadataBootstrap getMetadataBootstrap() {
         return metadataBootstrapSupplier.get();
     }
@@ -230,6 +234,6 @@
 
     @Override
     public IClusterStateManager getClusterStateManager() {
-        return ClusterStateManager.INSTANCE;
+        return clusterStateManager;
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index cdb3112..2042c7c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -66,7 +67,6 @@
      */
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
-    public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
     private final Map<String, Map<IOption, Object>> activeNcConfiguration = 
new HashMap<>();
     private Set<String> pendingRemoval = new HashSet<>();
     private final Cluster cluster;
@@ -78,13 +78,16 @@
     private boolean metadataNodeActive = false;
     private Set<String> failedNodes = new HashSet<>();
     private IFaultToleranceStrategy ftStrategy;
-    private CcApplicationContext appCtx;
+    private ICcApplicationContext appCtx;
 
-    private ClusterStateManager() {
+    public ClusterStateManager() {
+        Exception e = new Exception();
+        LOGGER.log(Level.WARNING, "Creating Cluster State manager", e);
         cluster = ClusterProperties.INSTANCE.getCluster();
     }
 
-    public void setCcAppCtx(CcApplicationContext appCtx) {
+    @Override
+    public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
         node2PartitionsMap = 
appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = 
appCtx.getMetadataProperties().getClusterPartitions();
@@ -93,6 +96,7 @@
         ftStrategy.bindTo(this);
     }
 
+    @Override
     public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
@@ -102,6 +106,7 @@
         pendingRemoval.remove(nodeId);
     }
 
+    @Override
     public synchronized void addNCConfiguration(String nodeId, Map<IOption, 
Object> configuration)
             throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -209,13 +214,7 @@
         return true;
     }
 
-    /**
-     * Returns the IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return a list of IO devices.
-     */
+    @Override
     public synchronized String[] getIODevices(String nodeId) {
         Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
         if (ncConfig == null) {
@@ -233,11 +232,13 @@
         return state;
     }
 
+    @Override
     public synchronized Node getAvailableSubstitutionNode() {
         List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
         return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
     }
 
+    @Override
     public synchronized Set<String> getParticipantNodes() {
         Set<String> participantNodes = new HashSet<>();
         for (String pNode : activeNcConfiguration.keySet()) {
@@ -246,6 +247,7 @@
         return participantNodes;
     }
 
+    @Override
     public synchronized Set<String> getParticipantNodes(boolean 
excludePendingRemoval) {
         Set<String> participantNodes = getParticipantNodes();
         if (excludePendingRemoval) {
@@ -254,6 +256,7 @@
         return participantNodes;
     }
 
+    @Override
     public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
         if (clusterPartitionConstraint == null) {
             resetClusterPartitionConstraint();
@@ -272,6 +275,7 @@
                 new 
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new 
String[] {}));
     }
 
+    @Override
     public synchronized boolean isClusterActive() {
         if (cluster == null) {
             // this is a virtual cluster
@@ -280,6 +284,7 @@
         return state == ClusterState.ACTIVE;
     }
 
+    @Override
     public int getNumberOfNodes() {
         return appCtx.getMetadataProperties().getNodeNames().size();
     }
@@ -289,6 +294,7 @@
         return node2PartitionsMap.get(nodeId);
     }
 
+    @Override
     public synchronized int getNodePartitionsCount(String node) {
         if (node2PartitionsMap.containsKey(node)) {
             return node2PartitionsMap.get(node).length;
@@ -305,10 +311,12 @@
         return partitons.toArray(new ClusterPartition[] {});
     }
 
+    @Override
     public synchronized boolean isMetadataNodeActive() {
         return metadataNodeActive;
     }
 
+    @Override
     public synchronized ObjectNode getClusterStateDescription() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode stateDescription = om.createObjectNode();
@@ -342,6 +350,7 @@
         return stateDescription;
     }
 
+    @Override
     public synchronized ObjectNode getClusterStateSummary() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode stateDescription = om.createObjectNode();
@@ -395,6 +404,7 @@
         }
     }
 
+    @Override
     public synchronized void removePending(String nodeId) {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
@@ -406,6 +416,7 @@
         }
     }
 
+    @Override
     public synchronized boolean cancelRemovePending(String nodeId) {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deregistering intention to remove node id " + nodeId);
@@ -423,8 +434,8 @@
     }
 
     private void updateNodeConfig(String nodeId, Map<IOption, Object> 
configuration) {
-        ConfigManager configManager = ((ConfigManagerApplicationConfig) 
appCtx.getServiceContext().getAppConfig())
-                .getConfigManager();
+        ConfigManager configManager =
+                ((ConfigManagerApplicationConfig) 
appCtx.getServiceContext().getAppConfig()).getConfigManager();
         for (Map.Entry<IOption, Object> entry : configuration.entrySet()) {
             if (entry.getKey().section() == Section.NC) {
                 configManager.set(nodeId, entry.getKey(), entry.getValue());

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1944
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to