Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology ......................................................................
[ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology - user model changes: no - storage format changes: no - interface changes: yes Renamed IClusterStateManager add/Remove NCConfig methods to notifyNode join/failure. Details: - Mark node as participant when it completes its startup and not when it joins the cluster. - Allow partitions to be added with pending activation state. - Remove the use of static MetadataProperties for reporting number of nodes. - Add test cases. Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2029 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm M asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm M asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm M asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 11 files changed, 320 insertions(+), 44 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java index db26c3a..80fdbd6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -186,8 +186,8 @@ private boolean addActiveReplica(String replica, ClusterPartition partition, Map<String, List<Integer>> partitionRecoveryPlan) { - Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration(); - if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { + final Set<String> participantNodes = clusterManager.getParticipantNodes(); + if (participantNodes.contains(replica) && !failedNodes.contains(replica)) { if (!partitionRecoveryPlan.containsKey(replica)) { List<Integer> replicaPartitions = new ArrayList<>(); replicaPartitions.add(partition.getPartitionId()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 0583508..84f841c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -71,7 +71,7 @@ LOGGER.info("NC: " + nodeId + " joined"); } IClusterStateManager csm = appCtx.getClusterStateManager(); - csm.addNCConfiguration(nodeId, ncConfiguration); + csm.notifyNodeJoin(nodeId, ncConfiguration); //if metadata node rejoining, we need to rebind the proxy connection when it is active again. if (!csm.isMetadataNodeActive()) { @@ -101,7 +101,7 @@ LOGGER.info("NC: " + deadNode + " left"); } IClusterStateManager csm = appCtx.getClusterStateManager(); - csm.removeNCConfiguration(deadNode); + csm.notifyNodeFailure(deadNode); //if metadata node failed, we need to rebind the proxy connection when it is active again if (!csm.isMetadataNodeActive()) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java index 241cd65..9887c57 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -48,15 +49,16 @@ List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() .map(Replica::getId).collect(Collectors.toList()); String nodeIdAddress = StringUtils.EMPTY; - Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration(); + Map<String, Map<IOption, Object>> ncConfiguration = clusterManager.getNcConfiguration(); // In case the node joined with a new IP address, we need to send it to the other replicas if (event == ClusterEventType.NODE_JOIN) { - nodeIdAddress = (String)activeNcConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS); + nodeIdAddress = (String) ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS); } + final Set<String> participantNodes = clusterManager.getParticipantNodes(); ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); for (String replica : primaryRemoteReplicas) { // If the remote replica is alive, send the event - if (activeNcConfiguration.containsKey(replica)) { + if (participantNodes.contains(replica)) { try { messageBroker.sendApplicationMessageToNC(msg, replica); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java new file mode 100644 index 0000000..6c11139 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime; + +import java.util.Collections; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.app.replication.NoFaultToleranceStrategy; +import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; +import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.metadata.IMetadataBootstrap; +import org.apache.asterix.runtime.transaction.ResourceIdManager; +import org.apache.asterix.runtime.utils.CcApplicationContext; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.control.cc.application.CCServiceContext; +import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class ClusterStateManagerTest { + + private static final String NC1 = "NC1"; + private static final String NC2 = "NC2"; + private static final String NC3 = "NC3"; + private static final String METADATA_NODE = NC1; + + /** + * Ensures that a cluster with a fixed topology will not be active until + * all partitions are active. + * + * @throws Exception + */ + @Test + public void fixedTopologyState() throws Exception { + ClusterStateManager csm = new ClusterStateManager(); + CcApplicationContext ccAppCtx = ccAppContext(csm); + // prepare fixed topology + ccAppCtx.getMetadataProperties().getClusterPartitions().put(0, new ClusterPartition(0, NC1, 0)); + ccAppCtx.getMetadataProperties().getClusterPartitions().put(1, new ClusterPartition(1, NC2, 0)); + ccAppCtx.getMetadataProperties().getClusterPartitions().put(2, new ClusterPartition(2, NC3, 0)); + for (ClusterPartition cp : ccAppCtx.getMetadataProperties().getClusterPartitions().values()) { + ccAppCtx.getMetadataProperties().getNodePartitions().put(cp.getNodeId(), new ClusterPartition[] { cp }); + } + csm.setCcAppCtx(ccAppCtx); + + // notify NC1 joined and completed startup + notifyNodeJoined(csm, NC1, 0, false); + notifyNodeStartupCompletion(ccAppCtx, NC1); + // cluster should be unusable + Assert.assertTrue(!csm.isClusterActive()); + // notify NC2 joined + notifyNodeJoined(csm, NC2, 1, false); + // notify NC3 joined + notifyNodeJoined(csm, NC3, 2, false); + // notify NC2 completed startup + notifyNodeStartupCompletion(ccAppCtx, NC2); + // cluster should still be unusable + Assert.assertTrue(!csm.isClusterActive()); + // notify NC3 completed startup + notifyNodeStartupCompletion(ccAppCtx, NC3); + // cluster should now be active + Assert.assertTrue(csm.isClusterActive()); + // NC2 failed + csm.notifyNodeFailure(NC2); + // cluster should now be unusable + Assert.assertTrue(!csm.isClusterActive()); + } + + /** + * Ensures that a cluster with a dynamic topology will not go into unusable state while + * new partitions are dynamically added. + * + * @throws Exception + */ + @Test + public void dynamicTopologyState() throws Exception { + ClusterStateManager csm = new ClusterStateManager(); + CcApplicationContext ccApplicationContext = ccAppContext(csm); + csm.setCcAppCtx(ccApplicationContext); + + // notify NC1 joined and completed startup + notifyNodeJoined(csm, NC1, 0, true); + notifyNodeStartupCompletion(ccApplicationContext, NC1); + // cluster should now be active + Assert.assertTrue(csm.isClusterActive()); + // notify NC2 joined + notifyNodeJoined(csm, NC2, 1, true); + // notify NC3 joined + notifyNodeJoined(csm, NC3, 2, true); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + // notify NC2 completed startup + notifyNodeStartupCompletion(ccApplicationContext, NC2); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + // notify NC3 completed startup + notifyNodeStartupCompletion(ccApplicationContext, NC3); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + // NC2 failed + csm.notifyNodeFailure(NC2); + // cluster should now be unusable + Assert.assertTrue(!csm.isClusterActive()); + } + + /** + * Ensures that a cluster with a dynamic topology will not go into unusable state if + * a newly added node fails before completing its startup + * + * @throws Exception + */ + @Test + public void dynamicTopologyNodeFailure() throws Exception { + ClusterStateManager csm = new ClusterStateManager(); + CcApplicationContext ccApplicationContext = ccAppContext(csm); + csm.setCcAppCtx(ccApplicationContext); + + // notify NC1 joined and completed startup + notifyNodeJoined(csm, NC1, 0, true); + notifyNodeStartupCompletion(ccApplicationContext, NC1); + // cluster should now be active + Assert.assertTrue(csm.isClusterActive()); + // notify NC2 joined + notifyNodeJoined(csm, NC2, 1, true); + // notify NC3 joined + notifyNodeJoined(csm, NC3, 2, true); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + // notify NC2 completed startup + notifyNodeStartupCompletion(ccApplicationContext, NC2); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + // NC3 failed before completing startup + csm.notifyNodeFailure(NC3); + // cluster should still be active + Assert.assertTrue(csm.isClusterActive()); + } + + /** + * Ensures that a cluster with a dynamic topology will be in an unusable state + * if all partitions are pending activation + * + * @throws Exception + */ + @Test + public void dynamicTopologyNoActivePartitions() throws Exception { + ClusterStateManager csm = new ClusterStateManager(); + CcApplicationContext ccApplicationContext = ccAppContext(csm); + csm.setCcAppCtx(ccApplicationContext); + + // notify NC1 joined + notifyNodeJoined(csm, NC1, 0, true); + // notify NC1 failed before completing startup + csm.notifyNodeFailure(NC1); + Assert.assertTrue(csm.getState() == ClusterState.UNUSABLE); + } + + private void notifyNodeJoined(ClusterStateManager csm, String nodeId, int partitionId, boolean registerPartitions) + throws HyracksException, AsterixException { + csm.notifyNodeJoin(nodeId, Collections.emptyMap()); + if (registerPartitions) { + csm.registerNodePartitions(nodeId, new ClusterPartition[] { new ClusterPartition(partitionId, nodeId, 0) }); + } + } + + private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId) + throws HyracksDataException { + NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true); + applicationContext.getResourceIdManager().report(nodeId, 0); + applicationContext.getFaultToleranceStrategy().process(msg); + } + + private CcApplicationContext ccAppContext(ClusterStateManager csm) throws HyracksDataException { + CcApplicationContext ccApplicationContext = Mockito.mock(CcApplicationContext.class); + ConfigManager configManager = new ConfigManager(null); + IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager); + ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class); + Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig); + Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext); + + NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy(); + fts.bindTo(csm); + Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts); + + MetadataProperties metadataProperties = mockMetadataProperties(); + Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties); + + ResourceIdManager resourceIdManager = new ResourceIdManager(csm); + Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager); + + IMetadataBootstrap metadataBootstrap = Mockito.mock(IMetadataBootstrap.class); + Mockito.doNothing().when(metadataBootstrap).init(); + Mockito.when(ccApplicationContext.getMetadataBootstrap()).thenReturn(metadataBootstrap); + + IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class); + Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true); + Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager); + return ccApplicationContext; + } + + private MetadataProperties mockMetadataProperties() { + SortedMap<Integer, ClusterPartition> clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>()); + Map<String, ClusterPartition[]> nodePartitionsMap = new ConcurrentHashMap<>(); + MetadataProperties metadataProperties = Mockito.mock(MetadataProperties.class); + Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE); + Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions); + Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap); + return metadataProperties; + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java index cc27fbb..cc99421 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java @@ -24,6 +24,8 @@ private final int ioDeviceNum; private String activeNodeId = null; private boolean active = false; + /* a flag indicating if the partition was dynamically added to the cluster and pending first time activation */ + private boolean pendingActivation = false; public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) { this.partitionId = partitionId; @@ -55,6 +57,18 @@ this.active = active; } + public boolean isActive() { + return active; + } + + public boolean isPendingActivation() { + return pendingActivation; + } + + public void setPendingActivation(boolean pendingActivation) { + this.pendingActivation = pendingActivation; + } + @Override public ClusterPartition clone() { return new ClusterPartition(partitionId, nodeId, ioDeviceNum); @@ -67,10 +81,7 @@ sb.append(", Original Node: " + nodeId); sb.append(", IODevice: " + ioDeviceNum); sb.append(", Active Node: " + activeNodeId); + sb.append(", Pending Activation: " + pendingActivation); return sb.toString(); - } - - public boolean isActive() { - return active; } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index b368c3b..3948ea6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -76,7 +76,7 @@ /** * @return a map of nodeId and NC Configuration for active nodes. */ - Map<String, Map<IOption, Object>> getActiveNcConfiguration(); + Map<String, Map<IOption, Object>> getNcConfiguration(); /** * @return The current metadata node Id. @@ -187,13 +187,13 @@ int getNumberOfNodes(); /** - * Add node configuration + * Notifies {@link IClusterStateManager} that a node has joined * * @param nodeId * @param ncConfiguration * @throws HyracksException */ - void addNCConfiguration(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException; + void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException; /** * @return true if metadata node is active, false otherwise @@ -201,12 +201,12 @@ boolean isMetadataNodeActive(); /** - * Remove configuration of a dead node + * Notifies {@link IClusterStateManager} that a node has failed * * @param deadNode * @throws HyracksException */ - void removeNCConfiguration(String deadNode) throws HyracksException; + void notifyNodeFailure(String deadNode) throws HyracksException; /** * @return a substitution node or null diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm index d6ea4b7..06e4d80 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm @@ -6,6 +6,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : false, + "pendingActivation" : false, "iodeviceNum" : 0 }, "1" : { @@ -13,6 +14,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : false, + "pendingActivation" : false, "iodeviceNum" : 1 }, "2" : { @@ -20,6 +22,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "3" : { @@ -27,6 +30,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 } }, diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm index 579caac..e0ec010 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm @@ -6,6 +6,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "1" : { @@ -13,6 +14,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 }, "2" : { @@ -20,6 +22,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "3" : { @@ -27,6 +30,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 } }, diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm index 579caac..e0ec010 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm @@ -6,6 +6,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "1" : { @@ -13,6 +14,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc1", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 }, "2" : { @@ -20,6 +22,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "3" : { @@ -27,6 +30,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 } }, diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm index 5f58ff7..2de5a55 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm @@ -6,6 +6,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "1" : { @@ -13,6 +14,7 @@ "nodeId" : "asterix_nc1", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 }, "2" : { @@ -20,6 +22,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 0 }, "3" : { @@ -27,6 +30,7 @@ "nodeId" : "asterix_nc2", "activeNodeId" : "asterix_nc2", "active" : true, + "pendingActivation" : false, "iodeviceNum" : 1 } }, diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 51c87b4..6e55fd2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -68,7 +68,7 @@ */ private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); - private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>(); + private final Map<String, Map<IOption, Object>> ncConfigMap = new HashMap<>(); private Set<String> pendingRemoval = new HashSet<>(); private final Cluster cluster; private ClusterState state = ClusterState.UNUSABLE; @@ -78,6 +78,7 @@ private String currentMetadataNode = null; private boolean metadataNodeActive = false; private Set<String> failedNodes = new HashSet<>(); + private Set<String> participantNodes = new HashSet<>(); private IFaultToleranceStrategy ftStrategy; private ICcApplicationContext appCtx; @@ -96,25 +97,25 @@ } @Override - public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { + public synchronized void notifyNodeFailure(String nodeId) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Removing configuration parameters for node id " + nodeId); } failedNodes.add(nodeId); - ftStrategy.notifyNodeFailure(nodeId); + ncConfigMap.remove(nodeId); pendingRemoval.remove(nodeId); + ftStrategy.notifyNodeFailure(nodeId); } @Override - public synchronized void addNCConfiguration(String nodeId, Map<IOption, Object> configuration) - throws HyracksException { + public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> configuration) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering configuration parameters for node id " + nodeId); } - activeNcConfiguration.put(nodeId, configuration); failedNodes.remove(nodeId); - ftStrategy.notifyNodeJoin(nodeId); + ncConfigMap.put(nodeId, configuration); updateNodeConfig(nodeId, configuration); + ftStrategy.notifyNodeJoin(nodeId); } @Override @@ -142,6 +143,11 @@ @Override public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException { + if (active) { + participantNodes.add(nodeId); + } else { + participantNodes.remove(nodeId); + } ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); // if this isn't a storage node, it will not have cluster partitions if (nodePartitions != null) { @@ -159,6 +165,7 @@ clusterPartition.setActive(active); if (active) { clusterPartition.setActiveNodeId(activeNode); + clusterPartition.setPendingActivation(false); } } } @@ -170,19 +177,22 @@ return; } resetClusterPartitionConstraint(); - if (clusterPartitions.isEmpty()) { + // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE + if (clusterPartitions.isEmpty() || clusterPartitions.values().stream() + .allMatch(ClusterPartition::isPendingActivation)) { LOGGER.info("Cluster does not have any registered partitions"); setState(ClusterState.UNUSABLE); return; } - for (ClusterPartition p : clusterPartitions.values()) { - if (!p.isActive()) { - setState(ClusterState.UNUSABLE); - return; - } + + // exclude partitions that are pending activation + if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) { + setState(ClusterState.UNUSABLE); + return; } + IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - for (String node : activeNcConfiguration.keySet()) { + for (String node : participantNodes) { if (!resourceIdManager.reported(node)) { LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered its max resource id...", node); @@ -234,7 +244,7 @@ @Override public synchronized String[] getIODevices(String nodeId) { - Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId); + Map<IOption, Object> ncConfig = ncConfigMap.get(nodeId); if (ncConfig == null) { if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("Configuration parameters for nodeId " + nodeId @@ -258,20 +268,16 @@ @Override public synchronized Set<String> getParticipantNodes() { - Set<String> participantNodes = new HashSet<>(); - for (String pNode : activeNcConfiguration.keySet()) { - participantNodes.add(pNode); - } - return participantNodes; + return new HashSet<>(participantNodes); } @Override public synchronized Set<String> getParticipantNodes(boolean excludePendingRemoval) { - Set<String> participantNodes = getParticipantNodes(); + final Set<String> participantNodesCopy = getParticipantNodes(); if (excludePendingRemoval) { - participantNodes.removeAll(pendingRemoval); + participantNodesCopy.removeAll(pendingRemoval); } - return participantNodes; + return participantNodesCopy; } @Override @@ -303,8 +309,8 @@ } @Override - public int getNumberOfNodes() { - return appCtx.getMetadataProperties().getNodeNames().size(); + public synchronized int getNumberOfNodes() { + return participantNodes.size(); } @Override @@ -379,8 +385,8 @@ } @Override - public Map<String, Map<IOption, Object>> getActiveNcConfiguration() { - return Collections.unmodifiableMap(activeNcConfiguration); + public Map<String, Map<IOption, Object>> getNcConfiguration() { + return Collections.unmodifiableMap(ncConfigMap); } @Override @@ -402,6 +408,7 @@ } } for (ClusterPartition nodePartition : nodePartitions) { + nodePartition.setPendingActivation(true); clusterPartitions.put(nodePartition.getPartitionId(), nodePartition); } node2PartitionsMap.put(nodeId, nodePartitions); @@ -419,6 +426,7 @@ for (ClusterPartition nodePartition : nodePartitions) { clusterPartitions.remove(nodePartition.getPartitionId()); } + participantNodes.remove(nodeId); } } @@ -427,7 +435,7 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering intention to remove node id " + nodeId); } - if (activeNcConfiguration.containsKey(nodeId)) { + if (participantNodes.contains(nodeId)) { pendingRemoval.add(nodeId); } else { LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal"); -- To view, visit https://asterix-gerrit.ics.uci.edu/2029 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7 Gerrit-PatchSet: 12 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
