Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2029
Change subject: [ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in
Dynamic Topology
......................................................................
[ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology
- user model changes: no
- storage format changes: no
- interface changes: yes
Renamed IClusterStateManager add/Remove NCConfig methods
to notifyNode join/failure.
Details:
- Mark node as participant when it completes its startup and
not when it joins the cluster.
- Allow partitions to be added with pending activation state.
- Remove the use of static MetadataProperties for reporting number of nodes.
- Add test cases.
Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
---
A CMakeLists.txt
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
8 files changed, 339 insertions(+), 40 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/29/2029/1
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..69e4321
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,63 @@
+#
+# Copyright 2017 Couchbase, Inc.
+#
+# "Top-level" CMakeLists.txt for Couchbase Analytics
+#
+# This CMakeLists.txt is intended to sit at the top-level
+# asterixdb/ directory! The repo manifest will copy it there.
+
+SET (_all "")
+IF (CB_INVOKE_MAVEN)
+ SET (_all "ALL")
+ENDIF ()
+ADD_CUSTOM_TARGET(analytics ${_all}
+ DEPENDS analytics-maven
+ COMMAND ${CMAKE_COMMAND} -E echo QQQQQQQQQQQ FINISHED ANALYTICS)
+
+# Create target to build Analytics
+MAVEN_PROJECT (TARGET analytics-maven
+ GOAL package
+ OPTS -B -pl :cbas-installer -am -DskipTests
"-DcbasInstallationTarget=${CMAKE_INSTALL_PREFIX}/lib" -Drat.skip)
+
+# Update the versions if this is a production build
+IF ("$ENV{PRODUCT}" STREQUAL "server-analytics") # update this test once
MB-25715 is addressed
+ SET(VERSION_PLUGIN "org.codehaus.mojo:versions-maven-plugin:2.4")
+ SET(TARGET_MAVEN_VERSION "1.0.0-cbas-dp3")
+
+ MAVEN_PROJECT (TARGET analytics-version-hyracks
+ GOAL ${VERSION_PLUGIN}:set
+ OPTS -B -f hyracks-fullstack/pom.xml -DnewVersion=${TARGET_MAVEN_VERSION})
+ MAVEN_PROJECT (TARGET analytics-version-asterixdb
+ GOAL ${VERSION_PLUGIN}:set
+ OPTS -B -f asterixdb/pom.xml -DnewVersion=${TARGET_MAVEN_VERSION})
+ MAVEN_PROJECT (TARGET analytics-version-asterix-opt
+ GOAL ${VERSION_PLUGIN}:set
+ OPTS -B -f asterixdb/asterix-opt/pom.xml
-DnewVersion=${TARGET_MAVEN_VERSION})
+ MAVEN_PROJECT (TARGET analytics-version-bom
+ GOAL ${VERSION_PLUGIN}:set
+ OPTS -B -f asterixdb/asterix-opt/asterix-opt-bom/pom.xml
-DnewVersion=${TARGET_MAVEN_VERSION})
+ MAVEN_PROJECT (TARGET analytics-update-properties
+ GOAL ${VERSION_PLUGIN}:update-properties
+ OPTS -B -DnewVersion=${TARGET_MAVEN_VERSION}
-DincludeProperties=algebricks.version,hyracks.version,asterix.version)
+
+ ADD_DEPENDENCIES(analytics-version-asterixdb-build
analytics-version-hyracks-build)
+ ADD_DEPENDENCIES(analytics-version-asterix-opt-build
analytics-version-asterixdb-build)
+ ADD_DEPENDENCIES(analytics-version-bom-build
analytics-version-asterix-opt-build)
+ ADD_DEPENDENCIES(analytics-update-properties-build
analytics-version-bom-build)
+ ADD_DEPENDENCIES(analytics-maven-build analytics-update-properties-build)
+
+ # Horrible hack to get the license & readme content we need for the DP3 into
the right place...
+ SET(LICENSE
${PROJECT_SOURCE_DIR}/analytics/asterixdb/asterix-opt/cbas-installer/target/generated-sources/LICENSE)
+ SET(README
${PROJECT_SOURCE_DIR}/analytics/asterixdb/asterix-opt/cbas-installer/src/main/README.txt)
+ ADD_CUSTOM_TARGET(analytics-readme-license-fixup
+ DEPENDS analytics-maven-build
+ COMMAND ${CMAKE_COMMAND} -E echo QQQQQQQQQQQ FIXING UP LICENSE / README
FILES FOR DP3
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README}
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/README_for_zip.txt
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README}
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/README.txt
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${LICENSE}
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/LICENSE.enterprise.txt
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README}
${PROJECT_SOURCE_DIR}/voltron/README_Linux.txt
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README}
${PROJECT_SOURCE_DIR}/voltron/README_Windows.txt
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${LICENSE}
${PROJECT_SOURCE_DIR}/voltron/LICENSE-enterprise.txt)
+
+ ADD_DEPENDENCIES(analytics analytics-readme-license-fixup)
+ENDIF ()
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index db26c3a..80fdbd6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -186,8 +186,8 @@
private boolean addActiveReplica(String replica, ClusterPartition
partition,
Map<String, List<Integer>> partitionRecoveryPlan) {
- Map<String, Map<IOption, Object>> activeNcConfiguration =
clusterManager.getActiveNcConfiguration();
- if (activeNcConfiguration.containsKey(replica) &&
!failedNodes.contains(replica)) {
+ final Set<String> participantNodes =
clusterManager.getParticipantNodes();
+ if (participantNodes.contains(replica) &&
!failedNodes.contains(replica)) {
if (!partitionRecoveryPlan.containsKey(replica)) {
List<Integer> replicaPartitions = new ArrayList<>();
replicaPartitions.add(partition.getPartitionId());
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0583508..84f841c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -71,7 +71,7 @@
LOGGER.info("NC: " + nodeId + " joined");
}
IClusterStateManager csm = appCtx.getClusterStateManager();
- csm.addNCConfiguration(nodeId, ncConfiguration);
+ csm.notifyNodeJoin(nodeId, ncConfiguration);
//if metadata node rejoining, we need to rebind the proxy connection
when it is active again.
if (!csm.isMetadataNodeActive()) {
@@ -101,7 +101,7 @@
LOGGER.info("NC: " + deadNode + " left");
}
IClusterStateManager csm = appCtx.getClusterStateManager();
- csm.removeNCConfiguration(deadNode);
+ csm.notifyNodeFailure(deadNode);
//if metadata node failed, we need to rebind the proxy connection
when it is active again
if (!csm.isMetadataNodeActive()) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
index 241cd65..9887c57 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -48,15 +49,16 @@
List<String> primaryRemoteReplicas =
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
.map(Replica::getId).collect(Collectors.toList());
String nodeIdAddress = StringUtils.EMPTY;
- Map<String, Map<IOption, Object>> activeNcConfiguration =
clusterManager.getActiveNcConfiguration();
+ Map<String, Map<IOption, Object>> ncConfiguration =
clusterManager.getNcConfiguration();
// In case the node joined with a new IP address, we need to send it
to the other replicas
if (event == ClusterEventType.NODE_JOIN) {
- nodeIdAddress =
(String)activeNcConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
+ nodeIdAddress = (String)
ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
}
+ final Set<String> participantNodes =
clusterManager.getParticipantNodes();
ReplicaEventMessage msg = new ReplicaEventMessage(nodeId,
nodeIdAddress, event);
for (String replica : primaryRemoteReplicas) {
// If the remote replica is alive, send the event
- if (activeNcConfiguration.containsKey(replica)) {
+ if (participantNodes.contains(replica)) {
try {
messageBroker.sendApplicationMessageToNC(msg, replica);
} catch (Exception e) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
new file mode 100644
index 0000000..7fc127b
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.replication.NoFaultToleranceStrategy;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.CcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import
org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClusterStateManagerTest {
+
+ private static final String NC1 = "NC1";
+ private static final String NC2 = "NC2";
+ private static final String NC3 = "NC3";
+ private static final String METADATA_NODE = NC1;
+
+ /**
+ * Ensures that a cluster with a fixed topology will not be active until
+ * all partitions are active.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void fixedClusterTopologyState() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccAppCtx = ccAppContext(csm);
+ // prepare fixed topology
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(0, new
ClusterPartition(0, NC1, 0));
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(1, new
ClusterPartition(1, NC2, 0));
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(2, new
ClusterPartition(2, NC3, 0));
+ for (ClusterPartition cp :
ccAppCtx.getMetadataProperties().getClusterPartitions().values()) {
+
ccAppCtx.getMetadataProperties().getNodePartitions().put(cp.getNodeId(), new
ClusterPartition[] { cp });
+ }
+ csm.setCcAppCtx(ccAppCtx);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, false);
+ notifyNodeStartupCompletion(ccAppCtx, NC1);
+ // cluster should be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, false);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, false);
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccAppCtx, NC2);
+ // cluster should still be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ // notify NC3 completed startup
+ notifyNodeStartupCompletion(ccAppCtx, NC3);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC2 failed
+ csm.notifyNodeFailure(NC2);
+ // cluster should now be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ }
+
+ /**
+ * Ensures that a cluster with a dynamic topology will not go into
unusable state while
+ * new partitions are dynamically added.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void dynamicClusterTopologyState() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccApplicationContext = ccAppContext(csm);
+ csm.setCcAppCtx(ccApplicationContext);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, true);
+ notifyNodeStartupCompletion(ccApplicationContext, NC1);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, true);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, true);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC2);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC3 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC3);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC2 failed
+ csm.notifyNodeFailure(NC2);
+ // cluster should now be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ }
+
+ /**
+ * Ensures that a cluster with a dynamic topology will not go into
unusable state if
+ * a newly added node fails before completing its startup
+ *
+ * @throws Exception
+ */
+ @Test
+ public void dynamicClusterTopologyNodeFailure() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccApplicationContext = ccAppContext(csm);
+ csm.setCcAppCtx(ccApplicationContext);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, true);
+ notifyNodeStartupCompletion(ccApplicationContext, NC1);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, true);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, true);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC2);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC3 failed before completing startup
+ csm.notifyNodeFailure(NC3);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ }
+
+ private void notifyNodeJoined(ClusterStateManager csm, String nodeId, int
partitionId, boolean registerPartitions)
+ throws HyracksException, AsterixException {
+ csm.notifyNodeJoin(nodeId, Collections.emptyMap());
+ if (registerPartitions) {
+ csm.registerNodePartitions(nodeId, new ClusterPartition[] { new
ClusterPartition(partitionId, nodeId, 0) });
+ }
+ }
+
+ private void notifyNodeStartupCompletion(CcApplicationContext
applicationContext, String nodeId)
+ throws HyracksDataException {
+ NCLifecycleTaskReportMessage msg = new
NCLifecycleTaskReportMessage(nodeId, true);
+ applicationContext.getResourceIdManager().report(nodeId, 0);
+ applicationContext.getFaultToleranceStrategy().process(msg);
+ }
+
+ private CcApplicationContext ccAppContext(ClusterStateManager csm) throws
HyracksDataException {
+ CcApplicationContext ccApplicationContext =
Mockito.mock(CcApplicationContext.class);
+ ConfigManager configManager = new ConfigManager(null);
+ IApplicationConfig applicationConfig = new
ConfigManagerApplicationConfig(configManager);
+ ICCServiceContext iccServiceContext =
Mockito.mock(CCServiceContext.class);
+
Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+
Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+
+ NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy();
+ fts.bindTo(csm);
+
Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts);
+
+ MetadataProperties metadataProperties = mockMetadataProperties();
+
Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+
+ ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
+
Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
+
+ IMetadataBootstrap metadataBootstrap =
Mockito.mock(IMetadataBootstrap.class);
+ Mockito.doNothing().when(metadataBootstrap).init();
+
Mockito.when(ccApplicationContext.getMetadataBootstrap()).thenReturn(metadataBootstrap);
+
+ IGlobalRecoveryManager globalRecoveryManager =
Mockito.mock(IGlobalRecoveryManager.class);
+
Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
+
Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+ return ccApplicationContext;
+ }
+
+ private MetadataProperties mockMetadataProperties() {
+ SortedMap<Integer, ClusterPartition> clusterPartitions =
Collections.synchronizedSortedMap(new TreeMap<>());
+ Map<String, ClusterPartition[]> nodePartitionsMap = new
ConcurrentHashMap<>();
+ MetadataProperties metadataProperties =
Mockito.mock(MetadataProperties.class);
+
Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE);
+
Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions);
+
Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
+ return metadataProperties;
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc27fbb..eae20f9 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -24,6 +24,8 @@
private final int ioDeviceNum;
private String activeNodeId = null;
private boolean active = false;
+ /* a flag indicating if the partition was dynamically added to the cluster
and pending first time activation */
+ private boolean pendingActivation = false;
public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
this.partitionId = partitionId;
@@ -55,6 +57,18 @@
this.active = active;
}
+ public boolean isActive() {
+ return active;
+ }
+
+ public boolean isPendingActivation() {
+ return pendingActivation;
+ }
+
+ public void setPendingActivation(boolean pendingActivation) {
+ this.pendingActivation = pendingActivation;
+ }
+
@Override
public ClusterPartition clone() {
return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
@@ -68,9 +82,5 @@
sb.append(", IODevice: " + ioDeviceNum);
sb.append(", Active Node: " + activeNodeId);
return sb.toString();
- }
-
- public boolean isActive() {
- return active;
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index b368c3b..3948ea6 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -76,7 +76,7 @@
/**
* @return a map of nodeId and NC Configuration for active nodes.
*/
- Map<String, Map<IOption, Object>> getActiveNcConfiguration();
+ Map<String, Map<IOption, Object>> getNcConfiguration();
/**
* @return The current metadata node Id.
@@ -187,13 +187,13 @@
int getNumberOfNodes();
/**
- * Add node configuration
+ * Notifies {@link IClusterStateManager} that a node has joined
*
* @param nodeId
* @param ncConfiguration
* @throws HyracksException
*/
- void addNCConfiguration(String nodeId, Map<IOption, Object>
ncConfiguration) throws HyracksException;
+ void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration)
throws HyracksException;
/**
* @return true if metadata node is active, false otherwise
@@ -201,12 +201,12 @@
boolean isMetadataNodeActive();
/**
- * Remove configuration of a dead node
+ * Notifies {@link IClusterStateManager} that a node has failed
*
* @param deadNode
* @throws HyracksException
*/
- void removeNCConfiguration(String deadNode) throws HyracksException;
+ void notifyNodeFailure(String deadNode) throws HyracksException;
/**
* @return a substitution node or null
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 334b683..ddbbc29 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -68,7 +68,7 @@
*/
private static final Logger LOGGER =
Logger.getLogger(ClusterStateManager.class.getName());
- private final Map<String, Map<IOption, Object>> activeNcConfiguration =
new HashMap<>();
+ private final Map<String, Map<IOption, Object>> ncConfig = new HashMap<>();
private Set<String> pendingRemoval = new HashSet<>();
private final Cluster cluster;
private ClusterState state = ClusterState.UNUSABLE;
@@ -78,6 +78,7 @@
private String currentMetadataNode = null;
private boolean metadataNodeActive = false;
private Set<String> failedNodes = new HashSet<>();
+ private Set<String> participantNodes = new HashSet<>();
private IFaultToleranceStrategy ftStrategy;
private ICcApplicationContext appCtx;
@@ -96,25 +97,25 @@
}
@Override
- public synchronized void removeNCConfiguration(String nodeId) throws
HyracksException {
+ public synchronized void notifyNodeFailure(String nodeId) throws
HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " +
nodeId);
}
failedNodes.add(nodeId);
- ftStrategy.notifyNodeFailure(nodeId);
+ ncConfig.remove(nodeId);
pendingRemoval.remove(nodeId);
+ ftStrategy.notifyNodeFailure(nodeId);
}
@Override
- public synchronized void addNCConfiguration(String nodeId, Map<IOption,
Object> configuration)
- throws HyracksException {
+ public synchronized void notifyNodeJoin(String nodeId, Map<IOption,
Object> configuration) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering configuration parameters for node id " +
nodeId);
}
- activeNcConfiguration.put(nodeId, configuration);
failedNodes.remove(nodeId);
- ftStrategy.notifyNodeJoin(nodeId);
+ ncConfig.put(nodeId, configuration);
updateNodeConfig(nodeId, configuration);
+ ftStrategy.notifyNodeJoin(nodeId);
}
@Override
@@ -142,6 +143,11 @@
@Override
public synchronized void updateNodePartitions(String nodeId, boolean
active) throws HyracksDataException {
+ if (active) {
+ participantNodes.add(nodeId);
+ } else {
+ participantNodes.remove(nodeId);
+ }
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
@@ -159,6 +165,7 @@
clusterPartition.setActive(active);
if (active) {
clusterPartition.setActiveNodeId(activeNode);
+ clusterPartition.setPendingActivation(false);
}
}
}
@@ -171,14 +178,15 @@
setState(ClusterState.UNUSABLE);
return;
}
- for (ClusterPartition p : clusterPartitions.values()) {
- if (!p.isActive()) {
- setState(ClusterState.UNUSABLE);
- return;
- }
+
+ // exclude partitions that are pending activation
+ if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() &&
!p.isPendingActivation())) {
+ setState(ClusterState.UNUSABLE);
+ return;
}
+
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- for (String node : activeNcConfiguration.keySet()) {
+ for (String node : participantNodes) {
if (!resourceIdManager.reported(node)) {
LOGGER.log(Level.INFO, "Partitions are ready but %s has not
yet registered its max resource id...",
node);
@@ -230,7 +238,7 @@
@Override
public synchronized String[] getIODevices(String nodeId) {
- Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
+ Map<IOption, Object> ncConfig = this.ncConfig.get(nodeId);
if (ncConfig == null) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -254,11 +262,7 @@
@Override
public synchronized Set<String> getParticipantNodes() {
- Set<String> participantNodes = new HashSet<>();
- for (String pNode : activeNcConfiguration.keySet()) {
- participantNodes.add(pNode);
- }
- return participantNodes;
+ return new HashSet<>(participantNodes);
}
@Override
@@ -299,8 +303,8 @@
}
@Override
- public int getNumberOfNodes() {
- return appCtx.getMetadataProperties().getNodeNames().size();
+ public synchronized int getNumberOfNodes() {
+ return participantNodes.size();
}
@Override
@@ -375,8 +379,8 @@
}
@Override
- public Map<String, Map<IOption, Object>> getActiveNcConfiguration() {
- return Collections.unmodifiableMap(activeNcConfiguration);
+ public Map<String, Map<IOption, Object>> getNcConfiguration() {
+ return Collections.unmodifiableMap(ncConfig);
}
@Override
@@ -398,6 +402,7 @@
}
}
for (ClusterPartition nodePartition : nodePartitions) {
+ nodePartition.setPendingActivation(true);
clusterPartitions.put(nodePartition.getPartitionId(),
nodePartition);
}
node2PartitionsMap.put(nodeId, nodePartitions);
@@ -423,7 +428,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering intention to remove node id " + nodeId);
}
- if (activeNcConfiguration.containsKey(nodeId)) {
+ if (participantNodes.contains(nodeId)) {
pendingRemoval.add(nodeId);
} else {
LOGGER.warning("Cannot register unknown node " + nodeId + " for
pending removal");
--
To view, visit https://asterix-gerrit.ics.uci.edu/2029
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>