mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r543305421



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of 
failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault 
domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {
+
+  private final FaultDomainType type;
+  private final String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    if (type == null || id == null) {
+      throw new SamzaException("Fault domain type and ID cannot be null.");

Review comment:
       can we be consistent and use `Preconditions or Objects.nonNull`? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of 
failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault 
domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {

Review comment:
       Doesn't it require `equals & hashcode` to be overridden as well for 
fault domain check?

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public enum FaultDomainType {

Review comment:
       nit: java docs

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running 
in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault 
domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a 
container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only 
updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, 
for all hosts that are healthy, up and running.

Review comment:
       cached, seems an implementation detail and no API detail.
   Do we require strong freshness? If so, the API should mandate that.
   
   If not, you should callout that the freshness is an implementation detail 
and API doesn't mandate anything.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,31 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new 
AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.
+   */
+  public final AtomicInteger hostToFaultDomainCacheUpdates = new 
AtomicInteger(0);

Review comment:
       why is this metric part of `SamzaApplicationState`? Refer to the above 
comment on `FaultDomainManager`. Caching or not is an implementation detail and 
seems very specific metric to track by the implementation of 
`FaultDomainManager`. Hence, its resting place is not within 
`SamzaApplicationState`.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -381,6 +428,9 @@ public final void issueResourceRequest(SamzaResourceRequest 
request) {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (request.getFaultDomains() != null && 
!request.getFaultDomains().isEmpty()) {

Review comment:
       The `null` check seems redundant since we already ensure the 
`faultDomains` are empty or present.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -156,6 +161,20 @@ public void handleContainerStopFail(String containerID, 
String resourceID,
     }
   }
 
+  /**
+   * This method gets the set of racks that the given active container's 
corresponding standby can be placed on.
+   * The set of racks returned is based on the set difference between the 
active container's racks,
+   * and all the available racks in the cluster based on the host to fault 
domain cache.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be 
scheduled
+   */
+  public Set<FaultDomain> 
getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
+    Set<FaultDomain> activeContainerRack = 
faultDomainManager.getFaultDomainOfHost(host);

Review comment:
       Potentially passing in a nullable host into `getFaultDomainOfHost`. Can 
we add a check to this method  or ensure only `non-nullable` can be passed into 
the method with annotation?

##########
File path: 
samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -38,6 +38,9 @@
   private static final String CLUSTER_MANAGER_FACTORY = 
"samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = 
"org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = 
"samza.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = 
"org.apache.samza.job.yarn.RackManagerFactory";

Review comment:
       should it be `YarnFaultDomainManagerFactory`?

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +382,29 @@ private FailoverMetadata 
registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
+   * This method returns the active container host given a standby or active 
container ID.
    *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  String getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = 
samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    if (resource != null) {
+      return resource.getHost();
+    }
+    return null;

Review comment:
       Can we return optional instead of null? It would force callers to handle 
the non-existent case of a host for a given container id.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running 
in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault 
domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a 
container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only 
updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, 
for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular host resides on based 
on the internal cache.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  Set<FaultDomain> getFaultDomainOfHost(String host);

Review comment:
       nit: s/getFaultDomainOfHost/getFaultDomainsForHost

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String 
preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       Pass `ImmutableSet.of` or any variant of empty set instead of `null` and 
get rid of the `null` check within the other constructor.

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+
+/**
+ * This class functionality works with the assumption that the 
job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could 
be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final SamzaApplicationState state;
+  private final YarnClientImpl yarnClient;
+
+  public YarnFaultDomainManager(SamzaApplicationState state) {
+    this.state = state;

Review comment:
       refer to the comments on the metrics. Addressing that should get rid of 
this dependency for `YarnFaultDomainManager`. Do we see other needs for this 
dependency?
   I'd assume this takes a `MetricsRegistry` instead
   
   context: `SamzaApplicationState`

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -73,7 +83,25 @@ public SamzaResourceRequest(int numCores, int memoryMB, 
String preferredHost, St
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
     this.requestTimestamp = requestTimestamp;
-    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at 
time: {} with Request ID: {}", this.processorId, this.preferredHost, 
this.requestTimestamp, this.requestId);
+    this.faultDomains = new HashSet<>();
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at 
time: {} with Request ID: {}, and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, 
this.requestId, this.faultDomains);
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String 
preferredHost, String processorId, Instant requestTimestamp, Set<FaultDomain> 
faultDomains) {
+    this.numCores = numCores;

Review comment:
       can we add `null` pre conditions check for `faultDomains`?

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -109,15 +141,16 @@ public String toString() {
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + faultDomains.toString() +
             '}';
   }
 
-  /**
-   * Requests are ordered by the processor type and the time at which they 
were created.
-   * Requests with timestamps in the future for retries take less precedence 
than timestamps in the past or current.
-   * Otherwise, active processors take precedence over standby processors, 
regardless of timestamp.
-   * @param o the other
-   */
+    /**
+     * Requests are ordered by the processor type and the time at which they 
were created.
+     * Requests with timestamps in the future for retries take less precedence 
than timestamps in the past or current.
+     * Otherwise, active processors take precedence over standby processors, 
regardless of timestamp.
+     * @param o the other
+     */

Review comment:
       nit: fix indentation

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -156,6 +161,20 @@ public void handleContainerStopFail(String containerID, 
String resourceID,
     }
   }
 
+  /**
+   * This method gets the set of racks that the given active container's 
corresponding standby can be placed on.
+   * The set of racks returned is based on the set difference between the 
active container's racks,
+   * and all the available racks in the cluster based on the host to fault 
domain cache.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be 
scheduled
+   */
+  public Set<FaultDomain> 
getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
+    Set<FaultDomain> activeContainerRack = 
faultDomainManager.getFaultDomainOfHost(host);
+    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
+    standbyRacks.removeAll(activeContainerRack);
+    return standbyRacks;

Review comment:
       can we rename the variables to generic fault domain instead of racks as 
that is the case?

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+
+/**
+ * This class functionality works with the assumption that the 
job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could 
be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final SamzaApplicationState state;
+  private final YarnClientImpl yarnClient;
+
+  public YarnFaultDomainManager(SamzaApplicationState state) {
+    this.state = state;
+    this.yarnClient = new YarnClientImpl();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(SamzaApplicationState state, YarnClientImpl 
yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.state = state;
+    this.yarnClient = yarnClient;
+    this.hostToRackMap = hostToRackMap;
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all 
hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the 
internal cache.
+   * In case the rack of a host does not exist in this cache, we update the 
cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainOfHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      state.hostToFaultDomainCacheUpdates.incrementAndGet();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || 
!hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      state.hostToFaultDomainCacheUpdates.incrementAndGet();
+    }
+    return 
hostToRackMap.get(host1).toString().equals(hostToRackMap.get(host2).toString());

Review comment:
       should go away with the `equals` & `hashcode` implemented for 
`FaultDomain`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to