abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1944
Change subject: Un-Singleton ClusterStateManager ...................................................................... Un-Singleton ClusterStateManager Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java R asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 36 files changed, 350 insertions(+), 163 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/44/1944/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java index 7f35d08..4550ba6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java @@ -260,10 +260,10 @@ keyAccessExpression = null; keyAccessScalarFunctionCallExpression = null; } - FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType, - pkTypes, partitioningKeys, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(), - FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(), - feedConnection); + FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed, + aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression, + sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), + context.getComputationNodeDomain(), feedConnection); feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy); return feedDataSource; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java index 1d47095..c3f01e8 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java @@ -38,7 +38,6 @@ import org.apache.asterix.metadata.dataset.hints.DatasetHints; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.utils.MetadataConstants; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -53,7 +52,7 @@ public void validateOperation(ICcApplicationContext appCtx, Dataverse defaultDataverse, Statement stmt) throws AsterixException { - final IClusterStateManager clusterStateManager = ClusterStateManager.INSTANCE; + final IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); final IGlobalRecoveryManager globalRecoveryManager = appCtx.getGlobalRecoveryManager(); if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE) && globalRecoveryManager.isRecoveryCompleted())) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 2799765..71c67f4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -32,10 +32,10 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.hyracks.bootstrap.NCApplication; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.application.INCApplication; @@ -116,7 +116,7 @@ thread.join(); } // Wait until cluster becomes active - ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); + ((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE); hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort()); this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index 82e8f7a..218a3ea 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -28,8 +28,8 @@ import java.util.logging.Logger; import java.util.regex.Pattern; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.control.common.config.ConfigUtils; @@ -60,9 +60,11 @@ protected static final String VERSION_URI_KEY = "versionUri"; protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri"; private final ObjectMapper om = new ObjectMapper(); + protected final ICcApplicationContext appCtx; - public ClusterApiServlet(ConcurrentMap<String, Object> ctx, String... paths) { + public ClusterApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) { super(ctx, paths); + this.appCtx = appCtx; } @Override @@ -94,11 +96,11 @@ } protected ObjectNode getClusterStateSummaryJSON() { - return ClusterStateManager.INSTANCE.getClusterStateSummary(); + return appCtx.getClusterStateManager().getClusterStateSummary(); } protected ObjectNode getClusterStateJSON(IServletRequest request, String pathToNode) { - ObjectNode json = ClusterStateManager.INSTANCE.getClusterStateDescription(); + ObjectNode json = appCtx.getClusterStateManager().getClusterStateDescription(); CcApplicationContext appConfig = (CcApplicationContext) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR); json.putPOJO("config", ConfigUtils.getSectionOptionsForJSON(appConfig.getServiceContext().getAppConfig(), Section.COMMON, getConfigSelector())); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java index 52d4d67..848e1f1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java @@ -26,6 +26,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -41,8 +42,9 @@ private static final Logger LOGGER = Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName()); private final ObjectMapper om = new ObjectMapper(); - public ClusterControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String... paths) { - super(ctx, paths); + public ClusterControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, + String... paths) { + super(appCtx, ctx, paths); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java index dcd0e70..6e62c95 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java @@ -49,11 +49,9 @@ public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet { private static final Logger LOGGER = Logger.getLogger(DiagnosticsApiServlet.class.getName()); - private final ICcApplicationContext appCtx; public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) { - super(ctx, paths); - this.appCtx = appCtx; + super(appCtx, ctx, paths); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java index d9757c7..d08204c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java @@ -29,7 +29,9 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -48,8 +50,9 @@ private static final Logger LOGGER = Logger.getLogger(NodeControllerDetailsApiServlet.class.getName()); private final ObjectMapper om = new ObjectMapper(); - public NodeControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String... paths) { - super(ctx, paths); + public NodeControllerDetailsApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, + String... paths) { + super(appCtx, ctx, paths); om.enable(SerializationFeature.INDENT_OUTPUT); } @@ -204,8 +207,9 @@ String dump = hcc.getThreadDump(node); if (dump == null) { // check to see if this is a node that is simply down - throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null ? new IllegalStateException() - : new IllegalArgumentException(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + ClusterPartition[] cp = csm.getNodePartitions(node); + throw cp != null ? new IllegalStateException() : new IllegalArgumentException(); } return (ObjectNode) om.readTree(dump); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1cec616..08f520a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -39,7 +39,6 @@ import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; @@ -62,6 +61,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import io.netty.handler.codec.http.HttpResponseStatus; public class QueryServiceServlet extends AbstractQueryApiServlet { @@ -445,7 +445,7 @@ protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd) throws Exception { - IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { // using a plain IllegalStateException here to get into the right catch clause for a 500 throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java index fdd106d..f940145 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java @@ -27,8 +27,9 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -48,14 +49,17 @@ public static final String NCSERVICE_PID = "ncservice_pid"; public static final String INI = "ini"; public static final String PID = "pid"; + private final IApplicationContext appCtx; - public ShutdownApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) { + public ShutdownApiServlet(IApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) { super(ctx, paths); + this.appCtx = appCtx; } @Override protected void post(IServletRequest request, IServletResponse response) { IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + IClusterStateManager csm = appCtx.getClusterStateManager(); boolean terminateNCServices = "true".equalsIgnoreCase(request.getParameter("all")); Thread t = new Thread(() -> { try { @@ -78,7 +82,7 @@ try { jsonObject.put("status", "SHUTTING_DOWN"); jsonObject.put("date", new Date().toString()); - ObjectNode clusterState = ClusterStateManager.INSTANCE.getClusterStateDescription(); + ObjectNode clusterState = csm.getClusterStateDescription(); ArrayNode ncs = (ArrayNode) clusterState.get("ncs"); for (int i = 0; i < ncs.size(); i++) { ObjectNode nc = (ObjectNode) ncs.get(i); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index e7919fa..90b9f46 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -28,7 +28,9 @@ import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.api.http.server.ResultUtil; import org.apache.asterix.app.cc.CCExtensionManager; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -40,7 +42,6 @@ import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; @@ -149,7 +150,9 @@ if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) { return "Node is not registerted with the CC"; } - final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + IApplicationContext appCtx = (IApplicationContext) ccSrv.getApplicationContext(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + final IClusterManagementWork.ClusterState clusterState = csm.getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { return "Cannot execute request, cluster is " + clusterState; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 7647881..d52ce84 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -34,6 +34,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.BuildProperties; @@ -73,6 +74,7 @@ import org.apache.asterix.replication.recovery.RemoteRecoveryManager; import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; import org.apache.hyracks.api.application.INCServiceContext; @@ -136,6 +138,8 @@ private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; + private final ClusterStateManager clusterStateManager; + public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions) throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { @@ -160,6 +164,7 @@ ncExtensionManager = new NCExtensionManager(allExtensions); componentProvider = new StorageComponentProvider(); resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory(); + clusterStateManager = new ClusterStateManager(); } @Override @@ -482,4 +487,9 @@ public INCServiceContext getServiceContext() { return ncServiceContext; } + + @Override + public IClusterStateManager getClusterStateManager() { + return clusterStateManager; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b97c014..e966d3d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -56,6 +56,7 @@ import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; @@ -152,7 +153,6 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; @@ -706,7 +706,8 @@ protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map<String, String> hints, String dataverseName, String datasetName, MetadataProvider metadataProvider) throws Exception { - Set<String> allNodes = ClusterStateManager.INSTANCE.getParticipantNodes(true); + IClusterStateManager csm = appCtx.getClusterStateManager(); + Set<String> allNodes = csm.getParticipantNodes(true); Set<String> selectedNodes = new LinkedHashSet<>(); String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME); if (hintValue == null) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index e8636c8..c26156d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -53,7 +53,6 @@ import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; -import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; @@ -78,7 +77,6 @@ import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.hyracks.api.application.ICCServiceContext; @@ -125,7 +123,7 @@ ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); ILibraryManager libraryManager = new ExternalLibraryManager(); - ResourceIdManager resourceIdManager = new ResourceIdManager(); + IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, ccServiceCtx); @@ -133,10 +131,9 @@ componentProvider = new StorageComponentProvider(); GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); statementExecutorCtx = new StatementExecutorContext(); - appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager, - () -> MetadataManager.INSTANCE, globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), - componentProvider, new MetadataLockManager()); - ClusterStateManager.INSTANCE.setCcAppCtx(appCtx); + appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, + globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), componentProvider, + new MetadataLockManager()); ccExtensionManager = new CCExtensionManager(getExtensions()); appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); @@ -302,15 +299,15 @@ case Servlets.REBALANCE: return new RebalanceApiServlet(ctx, paths, appCtx); case Servlets.SHUTDOWN: - return new ShutdownApiServlet(ctx, paths); + return new ShutdownApiServlet(appCtx, ctx, paths); case Servlets.VERSION: return new VersionApiServlet(ctx, paths); case Servlets.CLUSTER_STATE: - return new ClusterApiServlet(ctx, paths); + return new ClusterApiServlet(appCtx, ctx, paths); case Servlets.CLUSTER_STATE_NODE_DETAIL: - return new NodeControllerDetailsApiServlet(ctx, paths); + return new NodeControllerDetailsApiServlet(appCtx, ctx, paths); case Servlets.CLUSTER_STATE_CC_DETAIL: - return new ClusterControllerDetailsApiServlet(ctx, paths); + return new ClusterControllerDetailsApiServlet(appCtx, ctx, paths); case Servlets.DIAGNOSTICS: return new DiagnosticsApiServlet(ctx, paths, appCtx); case Servlets.ACTIVE_STATS: @@ -331,7 +328,7 @@ @Override public void startupCompleted() throws Exception { ccServiceCtx.getControllerService().getExecutor().submit(() -> { - ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); + appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE); ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); return null; }); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 66f76c5..0583508 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IClusterManagementWorkResponse; import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.event.schema.cluster.Node; @@ -42,7 +43,6 @@ import org.apache.asterix.metadata.cluster.RemoveNodeWork; import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.application.IClusterLifecycleListener; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksException; @@ -70,10 +70,11 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + nodeId + " joined"); } - ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, ncConfiguration); + IClusterStateManager csm = appCtx.getClusterStateManager(); + csm.addNCConfiguration(nodeId, ncConfiguration); //if metadata node rejoining, we need to rebind the proxy connection when it is active again. - if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + if (!csm.isMetadataNodeActive()) { MetadataManager.INSTANCE.rebindMetadataNode(); } @@ -99,10 +100,11 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + deadNode + " left"); } - ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode); + IClusterStateManager csm = appCtx.getClusterStateManager(); + csm.removeNCConfiguration(deadNode); //if metadata node failed, we need to rebind the proxy connection when it is active again - if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + if (!csm.isMetadataNodeActive()) { MetadataManager.INSTANCE.rebindMetadataNode(); } } @@ -171,8 +173,9 @@ List<String> addedNodes = new ArrayList<>(); String asterixInstanceName = ClusterProperties.INSTANCE.getCluster().getInstanceName(); + IClusterStateManager csm = appCtx.getClusterStateManager(); for (int i = 0; i < nodesToAdd; i++) { - Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode(); + Node node = csm.getAvailableSubstitutionNode(); if (node != null) { try { ClusterManagerProvider.getClusterManager().addNode(appCtx, node); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java index 46968b4..2977a58 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java @@ -25,13 +25,13 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.metadata.cluster.AddNodeWork; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; -import org.apache.asterix.runtime.utils.ClusterStateManager; public class ClusterWorkExecutor implements Runnable { @@ -69,9 +69,10 @@ } } + IClusterStateManager csm = appCtx.getClusterStateManager(); Set<Node> addedNodes = new HashSet<>(); for (int i = 0; i < nodesToAdd; i++) { - Node node = ClusterStateManager.INSTANCE.getAvailableSubstitutionNode(); + Node node = csm.getAvailableSubstitutionNode(); if (node != null) { try { ClusterManagerProvider.getClusterManager().addNode(appCtx, node); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 9fc9940..8dc1b3e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -33,6 +33,7 @@ import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveManagerMessage.Kind; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; @@ -66,7 +67,6 @@ import org.apache.asterix.metadata.feeds.LocationConstraint; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.asterix.translator.CompiledStatements; import org.apache.asterix.translator.IStatementExecutor; @@ -134,8 +134,10 @@ public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed) throws AsterixException { - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); + JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx); + IClusterStateManager csm = appCtx.getClusterStateManager(); + AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations(); Set<String> nodes = new TreeSet<>(); for (String node : allCluster.getLocations()) { nodes.add(node); @@ -143,7 +145,7 @@ AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); + FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations); org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC = StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true); @@ -273,9 +275,8 @@ } // make connections between operators - for (Entry<ConnectorDescriptorId, - Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry - : subJob.getConnectorOperatorMap().entrySet()) { + for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob + .getConnectorOperatorMap().entrySet()) { ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index c1421c5..53a4f23 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -405,7 +405,10 @@ IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true, MetadataUtil.PENDING_NO_OP); List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); - FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes); + FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits( + ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext()) + .getClusterStateManager(), + dataset, index.getIndexName(), nodes); fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1)); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index 0aea84d..90dab96 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.api; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.BuildProperties; import org.apache.asterix.common.config.CompilerProperties; @@ -60,4 +61,9 @@ IServiceContext getServiceContext(); + /** + * @return the cluster state manager + */ + IClusterStateManager getClusterStateManager(); + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 30675cd..b368c3b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -19,12 +19,19 @@ package org.apache.asterix.common.cluster; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; + +import com.fasterxml.jackson.databind.node.ObjectNode; public interface IClusterStateManager { @@ -49,6 +56,7 @@ /** * Updates all partitions of {@code nodeId} based on the {@code active} flag. + * * @param nodeId * @param active * @throws HyracksDataException @@ -93,6 +101,7 @@ /** * Blocks until the cluster state becomes {@code state}, or timeout is exhausted. + * * @return true if the desired state was reached before timeout occurred */ boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) @@ -116,4 +125,106 @@ * @throws HyracksDataException */ void deregisterNodePartitions(String nodeId) throws HyracksDataException; + + /** + * @return true if cluster is active, false otherwise + */ + boolean isClusterActive(); + + /** + * @return the set of participant nodes + */ + Set<String> getParticipantNodes(); + + /** + * Returns the IO devices configured for a Node Controller + * + * @param nodeId + * unique identifier of the Node Controller + * @return a list of IO devices. + */ + String[] getIODevices(String nodeId); + + /** + * @return the constraint representing all the partitions of the cluster + */ + AlgebricksAbsolutePartitionConstraint getClusterLocations(); + + /** + * @param excludePendingRemoval + * true, if the desired set shouldn't have pending removal nodes + * @return the set of participant nodes + */ + Set<String> getParticipantNodes(boolean excludePendingRemoval); + + /** + * @param node + * the node id + * @return the number of partitions on that node + */ + int getNodePartitionsCount(String node); + + /** + * @return a json object representing the cluster state summary + */ + ObjectNode getClusterStateSummary(); + + /** + * @return a json object representing the cluster state description + */ + ObjectNode getClusterStateDescription(); + + /** + * Set the cc application context + * + * @param appCtx + */ + void setCcAppCtx(ICcApplicationContext appCtx); + + /** + * @return the number of cluster nodes + */ + int getNumberOfNodes(); + + /** + * Add node configuration + * + * @param nodeId + * @param ncConfiguration + * @throws HyracksException + */ + void addNCConfiguration(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException; + + /** + * @return true if metadata node is active, false otherwise + */ + boolean isMetadataNodeActive(); + + /** + * Remove configuration of a dead node + * + * @param deadNode + * @throws HyracksException + */ + void removeNCConfiguration(String deadNode) throws HyracksException; + + /** + * @return a substitution node or null + */ + Node getAvailableSubstitutionNode(); + + /** + * Add node to the list of nodes pending removal + * + * @param nodeId + */ + void removePending(String nodeId); + + /** + * Deregister intention to remove node id + * + * @param nodeId + * @return + */ + boolean cancelRemovePending(String nodeId); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java index 0abb92f..b258a17 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java @@ -43,6 +43,8 @@ private Cluster cluster; private ClusterProperties() { + Exception creation = new Exception("ClusterProperties is getting created"); + creation.printStackTrace(); InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE); if (is != null) { try { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 3eff214..64de250 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -20,9 +20,9 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IMetadataLockManager; -import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; @@ -99,7 +99,7 @@ IMetadataLockManager getMetadataLockManager(); /** - * @return the cluster state manager + * @return the metadata bootstrap */ - IClusterStateManager getClusterStateManager(); + IMetadataBootstrap getMetadataBootstrap(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 2eb81d4..0a47788 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; @@ -92,8 +93,7 @@ public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException { INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); - INcApplicationContext appCtx = - (INcApplicationContext) serviceCtx.getApplicationContext(); + INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext(); try { restoreExternalObjects(serviceCtx, appCtx.getLibraryManager()); } catch (Exception e) { @@ -152,15 +152,16 @@ dataParserFactory.setMetaType(metaType); dataParserFactory.configure(configuration); ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory); - configureFeedLogManager(); + configureFeedLogManager(appCtx); nullifyExternalObjects(); } - private void configureFeedLogManager() throws HyracksDataException, AlgebricksException { + private void configureFeedLogManager(IApplicationContext appCtx) throws HyracksDataException, AlgebricksException { this.isFeed = ExternalDataUtils.isFeed(configuration); if (isFeed) { - feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint()); + feedLogFileSplits = FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx, + ExternalDataUtils.getDataverse(configuration), ExternalDataUtils.getFeedName(configuration), + dataSourceFactory.getPartitionConstraint()); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java index edda448..7cfbf51 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java @@ -25,8 +25,9 @@ import java.util.Set; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -87,6 +88,7 @@ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException { if (constraints == null) { + IClusterStateManager clusterStateManager = ((ICcApplicationContext) appCtx).getClusterStateManager(); ArrayList<String> locs = new ArrayList<>(); Set<String> stores = appCtx.getMetadataProperties().getStores().keySet(); if (stores.isEmpty()) { @@ -97,7 +99,7 @@ Iterator<String> storeIt = stores.iterator(); while (storeIt.hasNext()) { String node = storeIt.next(); - int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length; + int numIODevices = clusterStateManager.getIODevices(node).length; for (int k = 0; k < numIODevices; k++) { locs.add(node); i++; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java index 12be449..c7b8633 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java @@ -23,10 +23,10 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -54,6 +54,7 @@ private static final String KEY_INGESTION_LOCATIONS = "ingestion-location"; private Map<String, String> configuration; + private transient IServiceContext serviceCtx; @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { @@ -67,10 +68,10 @@ if (ingestionCardinalityParam != null) { count = Integer.parseInt(ingestionCardinalityParam); } - + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); List<String> chosenLocations = new ArrayList<>(); String[] availableLocations = locations != null ? locations - : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {}); + : appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {}); for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) { chosenLocations.add(availableLocations[k]); } @@ -84,6 +85,7 @@ @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration) { + this.serviceCtx = serviceCtx; this.configuration = configuration; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index ec7de91..dad0d51 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -26,9 +26,9 @@ import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; @@ -63,9 +63,9 @@ } public enum Mode { - PROCESS, // There is memory - SPILL, // Memory budget has been consumed. Now we're writing to disk - DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding + PROCESS, // There is memory + SPILL, // Memory budget has been consumed. Now we're writing to disk + DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding } private FeedUtils() { @@ -87,7 +87,7 @@ return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath()); } - public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, + public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, String dataverseName, String feedName, AlgebricksPartitionConstraint partitionConstraints) throws AsterixException { if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AsterixException("Can't create file splits for adapter with count partitioning constraints"); @@ -96,7 +96,7 @@ List<FileSplit> splits = new ArrayList<>(); for (String nd : locations) { splits.add(splitsForAdapter(dataverseName, feedName, nd, - ClusterStateManager.INSTANCE.getNodePartitions(nd)[0])); + appCtx.getClusterStateManager().getNodePartitions(nd)[0])); } return splits.toArray(new FileSplit[] {}); } @@ -113,8 +113,8 @@ public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) throws HyracksDataException { - return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), - 0, ctx.getIoManager()).getFile()); + return new FeedLogManager( + FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile()); } public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index b4353e7..bd50352 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -25,7 +25,9 @@ import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.indexing.ExternalFile; @@ -33,7 +35,6 @@ import org.apache.asterix.external.indexing.RecordId.RecordIdType; import org.apache.asterix.external.input.stream.HDFSInputStream; import org.apache.asterix.hivecompat.io.RCFileInputFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -207,10 +208,12 @@ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx, AlgebricksAbsolutePartitionConstraint clusterLocations) { if (clusterLocations == null) { + IClusterStateManager clusterStateManager = ((ICcApplicationContext) appCtx).getClusterStateManager(); ArrayList<String> locs = new ArrayList<>(); Map<String, String[]> stores = appCtx.getMetadataProperties().getStores(); for (String node : stores.keySet()) { - int numIODevices = ClusterStateManager.INSTANCE.getIODevices(node).length; + + int numIODevices = clusterStateManager.getIODevices(node).length; for (int k = 0; k < numIODevices; k++) { locs.add(node); } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java index c45941d..c09b9eb 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -43,17 +43,23 @@ private int upsertCycle = 0; private int numOfReaders; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + private transient IServiceContext serviceCtx; private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList()); @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - clusterLocations = ClusterStateManager.INSTANCE.getClusterLocations(); - numOfReaders = clusterLocations.getLocations().length; + if (clusterLocations == null) { + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + clusterLocations = appCtx.getClusterStateManager().getClusterLocations(); + numOfReaders = clusterLocations.getLocations().length; + } return clusterLocations; + } @Override public void configure(IServiceContext serviceCtx, final Map<String, String> configuration) { + this.serviceCtx = serviceCtx; if (configuration.containsKey("num-of-records")) { numOfRecords = Integer.parseInt(configuration.get("num-of-records")); } @@ -83,7 +89,8 @@ return DCPRequest.class; } - @Override public List<String> getRecordReaderNames() { + @Override + public List<String> getRecordReaderNames() { return recordReaderNames; } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java index 709f655..0f97194 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java @@ -21,8 +21,8 @@ import java.util.HashSet; import java.util.Set; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.utils.Pair; /** @@ -113,7 +113,8 @@ if (intValue < 0) { return new Pair<>(false, "Value must be >= 0"); } - int numNodesInCluster = ClusterStateManager.INSTANCE.getParticipantNodes(true).size(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + int numNodesInCluster = csm.getParticipantNodes(true).size(); if (numNodesInCluster < intValue) { return new Pair<>(false, "Value must be less than or equal to the available number of nodes in cluster (" diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index 26cec1e..0b6608c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.api.IFeed; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; @@ -34,7 +34,6 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -63,12 +62,12 @@ private final List<ScalarFunctionCallExpression> keyAccessExpression; private final FeedConnection feedConnection; - public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType, - List<IAType> pkTypes, List<List<String>> partitioningKeys, - List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, - FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection) - throws AlgebricksException { + public FeedDataSource(MetadataProvider metadataProvider, Feed feed, DataSourceId id, String targetDataset, + IAType itemType, IAType metaType, List<IAType> pkTypes, + List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, FeedRuntimeType location, + String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException { super(id, itemType, metaType, Type.FEED, domain); + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); this.feed = feed; this.targetDataset = targetDataset; this.sourceFeedId = sourceFeedId; @@ -76,7 +75,7 @@ this.locations = locations; this.pkTypes = pkTypes; this.keyAccessExpression = keyAccessExpression; - this.computeCardinality = ClusterStateManager.INSTANCE.getParticipantNodes().size(); + this.computeCardinality = appCtx.getClusterStateManager().getParticipantNodes().size(); this.feedConnection = feedConnection; initFeedDataSource(); } @@ -170,8 +169,8 @@ throws AlgebricksException { try { ARecordType feedOutputType = (ARecordType) itemType; - ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider() - .getSerializerDeserializer(feedOutputType); + ISerializerDeserializer payloadSerde = + NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType); ArrayList<ISerializerDeserializer> serdes = new ArrayList<>(); serdes.add(payloadSerde); if (metaItemType != null) { @@ -182,16 +181,16 @@ serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type)); } } - RecordDescriptor feedDesc = new RecordDescriptor( - serdes.toArray(new ISerializerDeserializer[serdes.size()])); - FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties() - .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); + RecordDescriptor feedDesc = + new RecordDescriptor(serdes.toArray(new ISerializerDeserializer[serdes.size()])); + FeedPolicyEntity feedPolicy = + (FeedPolicyEntity) getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); if (feedPolicy == null) { throw new AlgebricksException("Feed not configured with a policy"); } feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName()); - FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDataverseName(), - getId().getDatasourceName(), getTargetDataset()); + FeedConnectionId feedConnectionId = + new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset()); FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation()); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java index c23755d..97c6ed2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataManager; @@ -36,7 +37,6 @@ import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; @@ -111,12 +111,12 @@ return dataset; } - public static INodeDomain findNodeDomain(MetadataTransactionContext mdTxnCtx, String nodeGroupName) - throws AlgebricksException { + public static INodeDomain findNodeDomain(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, String nodeGroupName) throws AlgebricksException { NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName); List<String> partitions = new ArrayList<>(); for (String location : nodeGroup.getNodeNames()) { - int numPartitions = ClusterStateManager.INSTANCE.getNodePartitionsCount(location); + int numPartitions = clusterStateManager.getNodePartitionsCount(location); for (int i = 0; i < numPartitions; i++) { partitions.add(location); } @@ -165,24 +165,24 @@ } } - public static DataSource findDataSource(MetadataTransactionContext mdTxnCtx, DataSourceId id) - throws AlgebricksException { + public static DataSource findDataSource(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, DataSourceId id) throws AlgebricksException { try { - return lookupSourceInMetadata(mdTxnCtx, id); + return lookupSourceInMetadata(clusterStateManager, mdTxnCtx, id); } catch (MetadataException e) { throw new AlgebricksException(e); } } - public static DataSource lookupSourceInMetadata(MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) - throws AlgebricksException { + public static DataSource lookupSourceInMetadata(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) throws AlgebricksException { Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), aqlId.getDatasourceName()); if (dataset == null) { throw new AlgebricksException("Datasource with id " + aqlId + " was not found."); } IAType itemType = findType(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); IAType metaItemType = findType(mdTxnCtx, dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); - INodeDomain domain = findNodeDomain(mdTxnCtx, dataset.getNodeGroupName()); + INodeDomain domain = findNodeDomain(clusterStateManager, mdTxnCtx, dataset.getNodeGroupName()); byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? DataSource.Type.EXTERNAL_DATASET : DataSource.Type.INTERNAL_DATASET; return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 8971a90..d6a3f21 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -84,7 +85,6 @@ import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -295,7 +295,7 @@ } public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException { - return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName); + return MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, nodeGroupName); } public List<String> findNodes(String nodeGroupName) throws AlgebricksException { @@ -329,11 +329,11 @@ @Override public DataSource findDataSource(DataSourceId id) throws AlgebricksException { - return MetadataManagerUtil.findDataSource(mdTxnCtx, id); + return MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, id); } public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws AlgebricksException { - return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId); + return MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), mdTxnCtx, aqlId); } @Override @@ -709,8 +709,9 @@ int numPartitions = 0; List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); + IClusterStateManager csm = appCtx.getClusterStateManager(); for (String nd : nodeGroup) { - numPartitions += ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); + numPartitions += csm.getNodePartitionsCount(nd); } return numElementsHint / numPartitions; } @@ -755,12 +756,13 @@ } public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) { - return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse); + return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), + dataverse); } public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) throws AlgebricksException { - return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx); + return SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset, indexName, mdTxnCtx); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) @@ -777,7 +779,7 @@ } public AlgebricksAbsolutePartitionConstraint getClusterLocations() { - return ClusterStateManager.INSTANCE.getClusterLocations(); + return appCtx.getClusterStateManager().getClusterLocations(); } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index 6825f10..5b7ea59 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.utils.StoragePathUtil; @@ -30,7 +31,6 @@ import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -42,11 +42,11 @@ private SplitsAndConstraintsUtil() { } - private static FileSplit[] getDataverseSplits(String dataverseName) { + private static FileSplit[] getDataverseSplits(IClusterStateManager clusterStateManager, String dataverseName) { File relPathFile = new File(dataverseName); List<FileSplit> splits = new ArrayList<>(); // get all partitions - ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons(); + ClusterPartition[] clusterPartition = clusterStateManager.getClusterPartitons(); String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); for (int j = 0; j < clusterPartition.length; j++) { File f = new File( @@ -57,28 +57,29 @@ return splits.toArray(new FileSplit[] {}); } - public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, MetadataTransactionContext mdTxnCtx) - throws AlgebricksException { + public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, + String indexName, MetadataTransactionContext mdTxnCtx) throws AlgebricksException { try { NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()); if (nodeGroup == null) { throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName()); } List<String> nodeList = nodeGroup.getNodeNames(); - return getIndexSplits(dataset, indexName, nodeList); + return getIndexSplits(clusterStateManager, dataset, indexName, nodeList); } catch (MetadataException me) { throw new AlgebricksException(me); } } - public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, List<String> nodes) { + public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, + String indexName, List<String> nodes) { File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), dataset.getDatasetName(), indexName, dataset.getRebalanceCount())); String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); List<FileSplit> splits = new ArrayList<>(); for (String nd : nodes) { - int numPartitions = ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); - ClusterPartition[] nodePartitions = ClusterStateManager.INSTANCE.getNodePartitions(nd); + int numPartitions = clusterStateManager.getNodePartitionsCount(nd); + ClusterPartition[] nodePartitions = clusterStateManager.getNodePartitions(nd); // currently this case is never executed since the metadata group doesn't exists if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) { numPartitions = 1; @@ -97,8 +98,8 @@ } public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getDataverseSplitProviderAndConstraints( - String dataverse) { - FileSplit[] splits = getDataverseSplits(dataverse); + IClusterStateManager clusterStateManager, String dataverse) { + FileSplit[] splits = getDataverseSplits(clusterStateManager, dataverse); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 194fd59..82a1177 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -20,11 +20,11 @@ import java.util.Set; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ResourceIdRequestMessage implements ICcAddressedMessage { @@ -40,7 +40,8 @@ try { ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); - if (!ClusterStateManager.INSTANCE.isClusterActive()) { + IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); + if (!clusterStateManager.isClusterActive()) { reponse.setResourceId(-1); reponse.setException(new Exception("Cannot generate global resource id when cluster is not active.")); } else { @@ -49,7 +50,7 @@ if (reponse.getResourceId() < 0) { reponse.setException(new Exception("One or more nodes has not reported max resource id.")); } - requestMaxResourceID(resourceIdManager, broker); + requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } broker.sendApplicationMessageToNC(reponse, src); } catch (Exception e) { @@ -57,8 +58,9 @@ } } - private void requestMaxResourceID(IResourceIdManager resourceIdManager, ICCMessageBroker broker) throws Exception { - Set<String> getParticipantNodes = ClusterStateManager.INSTANCE.getParticipantNodes(); + private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, + ICCMessageBroker broker) throws Exception { + Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(); ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); for (String nodeId : getParticipantNodes) { if (!resourceIdManager.reported(nodeId)) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java similarity index 84% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java index 372404c..6a5ed08 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -16,27 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.app.cc; +package org.apache.asterix.runtime.transaction; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; public class ResourceIdManager implements IResourceIdManager { + private final IClusterStateManager csm; private final AtomicLong globalResourceId = new AtomicLong(); private volatile Set<String> reportedNodes = new HashSet<>(); private volatile boolean allReported = false; + + public ResourceIdManager(IClusterStateManager csm) { + this.csm = csm; + } @Override public long createResourceId() { if (!allReported) { synchronized (this) { if (!allReported) { - if (reportedNodes.size() < ClusterStateManager.INSTANCE.getNumberOfNodes()) { + if (reportedNodes.size() < csm.getNumberOfNodes()) { return -1; } else { reportedNodes = null; @@ -58,7 +63,7 @@ if (!allReported) { globalResourceId.set(Math.max(maxResourceId, globalResourceId.get())); reportedNodes.add(nodeId); - if (reportedNodes.size() == ClusterStateManager.INSTANCE.getNumberOfNodes()) { + if (reportedNodes.size() == csm.getNumberOfNodes()) { reportedNodes = null; allReported = true; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 28c480f..e4cc7f4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -43,6 +43,7 @@ import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -77,17 +78,16 @@ private IFaultToleranceStrategy ftStrategy; private IJobLifecycleListener activeLifeCycleListener; private IMetadataLockManager mdLockManager; + private IClusterStateManager clusterStateManager; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, - ILibraryManager libraryManager, IResourceIdManager resourceIdManager, - Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, - IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener, - IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager) - throws AsterixException, IOException { + ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, + IGlobalRecoveryManager globalRecoveryManager, IFaultToleranceStrategy ftStrategy, + IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, + IMetadataLockManager mdLockManager) throws AsterixException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; - this.resourceIdManager = resourceIdManager; this.activeLifeCycleListener = activeLifeCycleListener; // Determine whether to use old-style asterix-configuration.xml or new-style configuration. // QQQ strip this out eventually @@ -109,6 +109,9 @@ this.globalRecoveryManager = globalRecoveryManager; this.storageComponentProvider = storageComponentProvider; this.mdLockManager = mdLockManager; + clusterStateManager = new ClusterStateManager(); + clusterStateManager.setCcAppCtx(this); + this.resourceIdManager = new ResourceIdManager(clusterStateManager); } @Override @@ -204,6 +207,7 @@ return resourceIdManager; } + @Override public IMetadataBootstrap getMetadataBootstrap() { return metadataBootstrapSupplier.get(); } @@ -230,6 +234,6 @@ @Override public IClusterStateManager getClusterStateManager() { - return ClusterStateManager.INSTANCE; + return clusterStateManager; } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index cdb3112..2042c7c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.IFaultToleranceStrategy; @@ -66,7 +67,6 @@ */ private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); - public static final ClusterStateManager INSTANCE = new ClusterStateManager(); private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>(); private Set<String> pendingRemoval = new HashSet<>(); private final Cluster cluster; @@ -78,13 +78,16 @@ private boolean metadataNodeActive = false; private Set<String> failedNodes = new HashSet<>(); private IFaultToleranceStrategy ftStrategy; - private CcApplicationContext appCtx; + private ICcApplicationContext appCtx; - private ClusterStateManager() { + public ClusterStateManager() { + Exception e = new Exception(); + LOGGER.log(Level.WARNING, "Creating Cluster State manager", e); cluster = ClusterProperties.INSTANCE.getCluster(); } - public void setCcAppCtx(CcApplicationContext appCtx) { + @Override + public void setCcAppCtx(ICcApplicationContext appCtx) { this.appCtx = appCtx; node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); @@ -93,6 +96,7 @@ ftStrategy.bindTo(this); } + @Override public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Removing configuration parameters for node id " + nodeId); @@ -102,6 +106,7 @@ pendingRemoval.remove(nodeId); } + @Override public synchronized void addNCConfiguration(String nodeId, Map<IOption, Object> configuration) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { @@ -209,13 +214,7 @@ return true; } - /** - * Returns the IO devices configured for a Node Controller - * - * @param nodeId - * unique identifier of the Node Controller - * @return a list of IO devices. - */ + @Override public synchronized String[] getIODevices(String nodeId) { Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId); if (ncConfig == null) { @@ -233,11 +232,13 @@ return state; } + @Override public synchronized Node getAvailableSubstitutionNode() { List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode(); return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0); } + @Override public synchronized Set<String> getParticipantNodes() { Set<String> participantNodes = new HashSet<>(); for (String pNode : activeNcConfiguration.keySet()) { @@ -246,6 +247,7 @@ return participantNodes; } + @Override public synchronized Set<String> getParticipantNodes(boolean excludePendingRemoval) { Set<String> participantNodes = getParticipantNodes(); if (excludePendingRemoval) { @@ -254,6 +256,7 @@ return participantNodes; } + @Override public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() { if (clusterPartitionConstraint == null) { resetClusterPartitionConstraint(); @@ -272,6 +275,7 @@ new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); } + @Override public synchronized boolean isClusterActive() { if (cluster == null) { // this is a virtual cluster @@ -280,6 +284,7 @@ return state == ClusterState.ACTIVE; } + @Override public int getNumberOfNodes() { return appCtx.getMetadataProperties().getNodeNames().size(); } @@ -289,6 +294,7 @@ return node2PartitionsMap.get(nodeId); } + @Override public synchronized int getNodePartitionsCount(String node) { if (node2PartitionsMap.containsKey(node)) { return node2PartitionsMap.get(node).length; @@ -305,10 +311,12 @@ return partitons.toArray(new ClusterPartition[] {}); } + @Override public synchronized boolean isMetadataNodeActive() { return metadataNodeActive; } + @Override public synchronized ObjectNode getClusterStateDescription() { ObjectMapper om = new ObjectMapper(); ObjectNode stateDescription = om.createObjectNode(); @@ -342,6 +350,7 @@ return stateDescription; } + @Override public synchronized ObjectNode getClusterStateSummary() { ObjectMapper om = new ObjectMapper(); ObjectNode stateDescription = om.createObjectNode(); @@ -395,6 +404,7 @@ } } + @Override public synchronized void removePending(String nodeId) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering intention to remove node id " + nodeId); @@ -406,6 +416,7 @@ } } + @Override public synchronized boolean cancelRemovePending(String nodeId) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Deregistering intention to remove node id " + nodeId); @@ -423,8 +434,8 @@ } private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) { - ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()) - .getConfigManager(); + ConfigManager configManager = + ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager(); for (Map.Entry<IOption, Object> entry : configuration.entrySet()) { if (entry.getKey().section() == Section.NC) { configManager.set(nodeId, entry.getKey(), entry.getValue()); -- To view, visit https://asterix-gerrit.ics.uci.edu/1944 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>