Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2029

Change subject: [ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in 
Dynamic Topology
......................................................................

[ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology

- user model changes: no
- storage format changes: no
- interface changes: yes
  Renamed IClusterStateManager add/Remove NCConfig methods
  to notifyNode join/failure.

Details:
- Mark node as participant when it completes its startup and
  not when it joins the cluster.
- Allow partitions to be added with pending activation state.
- Remove the use of static MetadataProperties for reporting number of nodes.
- Add test cases.

Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
---
A CMakeLists.txt
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
8 files changed, 339 insertions(+), 40 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/29/2029/1

diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..69e4321
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,63 @@
+#
+# Copyright 2017 Couchbase, Inc.
+#
+# "Top-level" CMakeLists.txt for Couchbase Analytics
+#
+# This CMakeLists.txt is intended to sit at the top-level
+# asterixdb/ directory! The repo manifest will copy it there.
+
+SET (_all "")
+IF (CB_INVOKE_MAVEN)
+  SET (_all "ALL")
+ENDIF ()
+ADD_CUSTOM_TARGET(analytics ${_all}
+  DEPENDS analytics-maven
+  COMMAND ${CMAKE_COMMAND} -E echo QQQQQQQQQQQ FINISHED ANALYTICS)
+
+# Create target to build Analytics
+MAVEN_PROJECT (TARGET analytics-maven
+  GOAL package
+  OPTS -B -pl :cbas-installer -am -DskipTests 
"-DcbasInstallationTarget=${CMAKE_INSTALL_PREFIX}/lib" -Drat.skip)
+
+# Update the versions if this is a production build
+IF ("$ENV{PRODUCT}" STREQUAL "server-analytics")  # update this test once 
MB-25715 is addressed
+  SET(VERSION_PLUGIN "org.codehaus.mojo:versions-maven-plugin:2.4")
+  SET(TARGET_MAVEN_VERSION "1.0.0-cbas-dp3")
+
+  MAVEN_PROJECT (TARGET analytics-version-hyracks
+    GOAL ${VERSION_PLUGIN}:set
+    OPTS -B -f hyracks-fullstack/pom.xml -DnewVersion=${TARGET_MAVEN_VERSION})
+  MAVEN_PROJECT (TARGET analytics-version-asterixdb
+    GOAL ${VERSION_PLUGIN}:set
+    OPTS -B -f asterixdb/pom.xml -DnewVersion=${TARGET_MAVEN_VERSION})
+  MAVEN_PROJECT (TARGET analytics-version-asterix-opt
+    GOAL ${VERSION_PLUGIN}:set
+    OPTS -B -f asterixdb/asterix-opt/pom.xml 
-DnewVersion=${TARGET_MAVEN_VERSION})
+  MAVEN_PROJECT (TARGET analytics-version-bom
+    GOAL ${VERSION_PLUGIN}:set
+    OPTS -B -f asterixdb/asterix-opt/asterix-opt-bom/pom.xml 
-DnewVersion=${TARGET_MAVEN_VERSION})
+  MAVEN_PROJECT (TARGET analytics-update-properties
+    GOAL ${VERSION_PLUGIN}:update-properties
+    OPTS -B -DnewVersion=${TARGET_MAVEN_VERSION} 
-DincludeProperties=algebricks.version,hyracks.version,asterix.version)
+
+  ADD_DEPENDENCIES(analytics-version-asterixdb-build 
analytics-version-hyracks-build)
+  ADD_DEPENDENCIES(analytics-version-asterix-opt-build 
analytics-version-asterixdb-build)
+  ADD_DEPENDENCIES(analytics-version-bom-build 
analytics-version-asterix-opt-build)
+  ADD_DEPENDENCIES(analytics-update-properties-build 
analytics-version-bom-build)
+  ADD_DEPENDENCIES(analytics-maven-build analytics-update-properties-build)
+
+  # Horrible hack to get the license & readme content we need for the DP3 into 
the right place...
+  SET(LICENSE 
${PROJECT_SOURCE_DIR}/analytics/asterixdb/asterix-opt/cbas-installer/target/generated-sources/LICENSE)
+  SET(README 
${PROJECT_SOURCE_DIR}/analytics/asterixdb/asterix-opt/cbas-installer/src/main/README.txt)
+  ADD_CUSTOM_TARGET(analytics-readme-license-fixup
+    DEPENDS analytics-maven-build
+    COMMAND ${CMAKE_COMMAND} -E echo QQQQQQQQQQQ FIXING UP LICENSE / README 
FILES FOR DP3
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README} 
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/README_for_zip.txt
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README} 
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/README.txt
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${LICENSE} 
${PROJECT_SOURCE_DIR}/couchdbx-app/makedmg/LICENSE.enterprise.txt
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README} 
${PROJECT_SOURCE_DIR}/voltron/README_Linux.txt
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${README} 
${PROJECT_SOURCE_DIR}/voltron/README_Windows.txt
+    COMMAND ${CMAKE_COMMAND} -E copy_if_different ${LICENSE} 
${PROJECT_SOURCE_DIR}/voltron/LICENSE-enterprise.txt)
+
+  ADD_DEPENDENCIES(analytics analytics-readme-license-fixup)
+ENDIF ()
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index db26c3a..80fdbd6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -186,8 +186,8 @@
 
     private boolean addActiveReplica(String replica, ClusterPartition 
partition,
             Map<String, List<Integer>> partitionRecoveryPlan) {
-        Map<String, Map<IOption, Object>> activeNcConfiguration = 
clusterManager.getActiveNcConfiguration();
-        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
+        final Set<String> participantNodes = 
clusterManager.getParticipantNodes();
+        if (participantNodes.contains(replica) && 
!failedNodes.contains(replica)) {
             if (!partitionRecoveryPlan.containsKey(replica)) {
                 List<Integer> replicaPartitions = new ArrayList<>();
                 replicaPartitions.add(partition.getPartitionId());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0583508..84f841c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -71,7 +71,7 @@
             LOGGER.info("NC: " + nodeId + " joined");
         }
         IClusterStateManager csm = appCtx.getClusterStateManager();
-        csm.addNCConfiguration(nodeId, ncConfiguration);
+        csm.notifyNodeJoin(nodeId, ncConfiguration);
 
         //if metadata node rejoining, we need to rebind the proxy connection 
when it is active again.
         if (!csm.isMetadataNodeActive()) {
@@ -101,7 +101,7 @@
                 LOGGER.info("NC: " + deadNode + " left");
             }
             IClusterStateManager csm = appCtx.getClusterStateManager();
-            csm.removeNCConfiguration(deadNode);
+            csm.notifyNodeFailure(deadNode);
 
             //if metadata node failed, we need to rebind the proxy connection 
when it is active again
             if (!csm.isMetadataNodeActive()) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
index 241cd65..9887c57 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -48,15 +49,16 @@
         List<String> primaryRemoteReplicas = 
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
                 .map(Replica::getId).collect(Collectors.toList());
         String nodeIdAddress = StringUtils.EMPTY;
-        Map<String, Map<IOption, Object>> activeNcConfiguration = 
clusterManager.getActiveNcConfiguration();
+        Map<String, Map<IOption, Object>> ncConfiguration = 
clusterManager.getNcConfiguration();
         // In case the node joined with a new IP address, we need to send it 
to the other replicas
         if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = 
(String)activeNcConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
+            nodeIdAddress = (String) 
ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
         }
+        final Set<String> participantNodes = 
clusterManager.getParticipantNodes();
         ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
         for (String replica : primaryRemoteReplicas) {
             // If the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
+            if (participantNodes.contains(replica)) {
                 try {
                     messageBroker.sendApplicationMessageToNC(msg, replica);
                 } catch (Exception e) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
new file mode 100644
index 0000000..7fc127b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.replication.NoFaultToleranceStrategy;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.CcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import 
org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClusterStateManagerTest {
+
+    private static final String NC1 = "NC1";
+    private static final String NC2 = "NC2";
+    private static final String NC3 = "NC3";
+    private static final String METADATA_NODE = NC1;
+
+    /**
+     * Ensures that a cluster with a fixed topology will not be active until
+     * all partitions are active.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void fixedClusterTopologyState() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccAppCtx = ccAppContext(csm);
+        // prepare fixed topology
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(0, new 
ClusterPartition(0, NC1, 0));
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(1, new 
ClusterPartition(1, NC2, 0));
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(2, new 
ClusterPartition(2, NC3, 0));
+        for (ClusterPartition cp : 
ccAppCtx.getMetadataProperties().getClusterPartitions().values()) {
+            
ccAppCtx.getMetadataProperties().getNodePartitions().put(cp.getNodeId(), new 
ClusterPartition[] { cp });
+        }
+        csm.setCcAppCtx(ccAppCtx);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, false);
+        notifyNodeStartupCompletion(ccAppCtx, NC1);
+        // cluster should be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, false);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, false);
+        // notify NC2 completed startup
+        notifyNodeStartupCompletion(ccAppCtx, NC2);
+        // cluster should still be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+        // notify NC3 completed startup
+        notifyNodeStartupCompletion(ccAppCtx, NC3);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC2 failed
+        csm.notifyNodeFailure(NC2);
+        // cluster should now be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+    }
+
+    /**
+     * Ensures that a cluster with a dynamic topology will not go into 
unusable state while
+     * new partitions are dynamically added.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dynamicClusterTopologyState() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccApplicationContext = ccAppContext(csm);
+        csm.setCcAppCtx(ccApplicationContext);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, true);
+        notifyNodeStartupCompletion(ccApplicationContext, NC1);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, true);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, true);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC2 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC2);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC3 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC3);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC2 failed
+        csm.notifyNodeFailure(NC2);
+        // cluster should now be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+    }
+
+    /**
+     * Ensures that a cluster with a dynamic topology will not go into 
unusable state if
+     * a newly added node fails before completing its startup
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dynamicClusterTopologyNodeFailure() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccApplicationContext = ccAppContext(csm);
+        csm.setCcAppCtx(ccApplicationContext);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, true);
+        notifyNodeStartupCompletion(ccApplicationContext, NC1);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, true);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, true);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC2 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC2);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC3 failed before completing startup
+        csm.notifyNodeFailure(NC3);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+    }
+
+    private void notifyNodeJoined(ClusterStateManager csm, String nodeId, int 
partitionId, boolean registerPartitions)
+            throws HyracksException, AsterixException {
+        csm.notifyNodeJoin(nodeId, Collections.emptyMap());
+        if (registerPartitions) {
+            csm.registerNodePartitions(nodeId, new ClusterPartition[] { new 
ClusterPartition(partitionId, nodeId, 0) });
+        }
+    }
+
+    private void notifyNodeStartupCompletion(CcApplicationContext 
applicationContext, String nodeId)
+            throws HyracksDataException {
+        NCLifecycleTaskReportMessage msg = new 
NCLifecycleTaskReportMessage(nodeId, true);
+        applicationContext.getResourceIdManager().report(nodeId, 0);
+        applicationContext.getFaultToleranceStrategy().process(msg);
+    }
+
+    private CcApplicationContext ccAppContext(ClusterStateManager csm) throws 
HyracksDataException {
+        CcApplicationContext ccApplicationContext = 
Mockito.mock(CcApplicationContext.class);
+        ConfigManager configManager = new ConfigManager(null);
+        IApplicationConfig applicationConfig = new 
ConfigManagerApplicationConfig(configManager);
+        ICCServiceContext iccServiceContext = 
Mockito.mock(CCServiceContext.class);
+        
Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+        
Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+
+        NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy();
+        fts.bindTo(csm);
+        
Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts);
+
+        MetadataProperties metadataProperties = mockMetadataProperties();
+        
Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+
+        ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
+        
Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
+
+        IMetadataBootstrap metadataBootstrap = 
Mockito.mock(IMetadataBootstrap.class);
+        Mockito.doNothing().when(metadataBootstrap).init();
+        
Mockito.when(ccApplicationContext.getMetadataBootstrap()).thenReturn(metadataBootstrap);
+
+        IGlobalRecoveryManager globalRecoveryManager = 
Mockito.mock(IGlobalRecoveryManager.class);
+        
Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
+        
Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+        return ccApplicationContext;
+    }
+
+    private MetadataProperties mockMetadataProperties() {
+        SortedMap<Integer, ClusterPartition> clusterPartitions = 
Collections.synchronizedSortedMap(new TreeMap<>());
+        Map<String, ClusterPartition[]> nodePartitionsMap = new 
ConcurrentHashMap<>();
+        MetadataProperties metadataProperties = 
Mockito.mock(MetadataProperties.class);
+        
Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE);
+        
Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions);
+        
Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
+        return metadataProperties;
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc27fbb..eae20f9 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -24,6 +24,8 @@
     private final int ioDeviceNum;
     private String activeNodeId = null;
     private boolean active = false;
+    /* a flag indicating if the partition was dynamically added to the cluster 
and pending first time activation */
+    private boolean pendingActivation = false;
 
     public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
         this.partitionId = partitionId;
@@ -55,6 +57,18 @@
         this.active = active;
     }
 
+    public boolean isActive() {
+        return active;
+    }
+
+    public boolean isPendingActivation() {
+        return pendingActivation;
+    }
+
+    public void setPendingActivation(boolean pendingActivation) {
+        this.pendingActivation = pendingActivation;
+    }
+
     @Override
     public ClusterPartition clone() {
         return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
@@ -68,9 +82,5 @@
         sb.append(", IODevice: " + ioDeviceNum);
         sb.append(", Active Node: " + activeNodeId);
         return sb.toString();
-    }
-
-    public boolean isActive() {
-        return active;
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index b368c3b..3948ea6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -76,7 +76,7 @@
     /**
      * @return a map of nodeId and NC Configuration for active nodes.
      */
-    Map<String, Map<IOption, Object>> getActiveNcConfiguration();
+    Map<String, Map<IOption, Object>> getNcConfiguration();
 
     /**
      * @return The current metadata node Id.
@@ -187,13 +187,13 @@
     int getNumberOfNodes();
 
     /**
-     * Add node configuration
+     * Notifies {@link IClusterStateManager} that a node has joined
      *
      * @param nodeId
      * @param ncConfiguration
      * @throws HyracksException
      */
-    void addNCConfiguration(String nodeId, Map<IOption, Object> 
ncConfiguration) throws HyracksException;
+    void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) 
throws HyracksException;
 
     /**
      * @return true if metadata node is active, false otherwise
@@ -201,12 +201,12 @@
     boolean isMetadataNodeActive();
 
     /**
-     * Remove configuration of a dead node
+     * Notifies {@link IClusterStateManager} that a node has failed
      *
      * @param deadNode
      * @throws HyracksException
      */
-    void removeNCConfiguration(String deadNode) throws HyracksException;
+    void notifyNodeFailure(String deadNode) throws HyracksException;
 
     /**
      * @return a substitution node or null
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 334b683..ddbbc29 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -68,7 +68,7 @@
      */
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
-    private final Map<String, Map<IOption, Object>> activeNcConfiguration = 
new HashMap<>();
+    private final Map<String, Map<IOption, Object>> ncConfig = new HashMap<>();
     private Set<String> pendingRemoval = new HashSet<>();
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -78,6 +78,7 @@
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
     private Set<String> failedNodes = new HashSet<>();
+    private Set<String> participantNodes = new HashSet<>();
     private IFaultToleranceStrategy ftStrategy;
     private ICcApplicationContext appCtx;
 
@@ -96,25 +97,25 @@
     }
 
     @Override
-    public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
+    public synchronized void notifyNodeFailure(String nodeId) throws 
HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
         }
         failedNodes.add(nodeId);
-        ftStrategy.notifyNodeFailure(nodeId);
+        ncConfig.remove(nodeId);
         pendingRemoval.remove(nodeId);
+        ftStrategy.notifyNodeFailure(nodeId);
     }
 
     @Override
-    public synchronized void addNCConfiguration(String nodeId, Map<IOption, 
Object> configuration)
-            throws HyracksException {
+    public synchronized void notifyNodeJoin(String nodeId, Map<IOption, 
Object> configuration) throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
         }
-        activeNcConfiguration.put(nodeId, configuration);
         failedNodes.remove(nodeId);
-        ftStrategy.notifyNodeJoin(nodeId);
+        ncConfig.put(nodeId, configuration);
         updateNodeConfig(nodeId, configuration);
+        ftStrategy.notifyNodeJoin(nodeId);
     }
 
     @Override
@@ -142,6 +143,11 @@
 
     @Override
     public synchronized void updateNodePartitions(String nodeId, boolean 
active) throws HyracksDataException {
+        if (active) {
+            participantNodes.add(nodeId);
+        } else {
+            participantNodes.remove(nodeId);
+        }
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
@@ -159,6 +165,7 @@
             clusterPartition.setActive(active);
             if (active) {
                 clusterPartition.setActiveNodeId(activeNode);
+                clusterPartition.setPendingActivation(false);
             }
         }
     }
@@ -171,14 +178,15 @@
             setState(ClusterState.UNUSABLE);
             return;
         }
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (!p.isActive()) {
-                setState(ClusterState.UNUSABLE);
-                return;
-            }
+
+        // exclude partitions that are pending activation
+        if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && 
!p.isPendingActivation())) {
+            setState(ClusterState.UNUSABLE);
+            return;
         }
+
         IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-        for (String node : activeNcConfiguration.keySet()) {
+        for (String node : participantNodes) {
             if (!resourceIdManager.reported(node)) {
                 LOGGER.log(Level.INFO, "Partitions are ready but %s has not 
yet registered its max resource id...",
                         node);
@@ -230,7 +238,7 @@
 
     @Override
     public synchronized String[] getIODevices(String nodeId) {
-        Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
+        Map<IOption, Object> ncConfig = this.ncConfig.get(nodeId);
         if (ncConfig == null) {
             if (LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -254,11 +262,7 @@
 
     @Override
     public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<>();
-        for (String pNode : activeNcConfiguration.keySet()) {
-            participantNodes.add(pNode);
-        }
-        return participantNodes;
+        return new HashSet<>(participantNodes);
     }
 
     @Override
@@ -299,8 +303,8 @@
     }
 
     @Override
-    public int getNumberOfNodes() {
-        return appCtx.getMetadataProperties().getNodeNames().size();
+    public synchronized int getNumberOfNodes() {
+        return participantNodes.size();
     }
 
     @Override
@@ -375,8 +379,8 @@
     }
 
     @Override
-    public Map<String, Map<IOption, Object>> getActiveNcConfiguration() {
-        return Collections.unmodifiableMap(activeNcConfiguration);
+    public Map<String, Map<IOption, Object>> getNcConfiguration() {
+        return Collections.unmodifiableMap(ncConfig);
     }
 
     @Override
@@ -398,6 +402,7 @@
             }
         }
         for (ClusterPartition nodePartition : nodePartitions) {
+            nodePartition.setPendingActivation(true);
             clusterPartitions.put(nodePartition.getPartitionId(), 
nodePartition);
         }
         node2PartitionsMap.put(nodeId, nodePartitions);
@@ -423,7 +428,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
         }
-        if (activeNcConfiguration.containsKey(nodeId)) {
+        if (participantNodes.contains(nodeId)) {
             pendingRemoval.add(nodeId);
         } else {
             LOGGER.warning("Cannot register unknown node " + nodeId + " for 
pending removal");

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2029
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to