Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container 
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r348859993
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 ##########
 @@ -161,4 +237,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String 
processorId, String pref
       allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
     }
   }
+
+  /**
+   * Registers a container placement action to move the running container to 
destination host, if destination host is same as the
+   * host on which container is running, container placement action is treated 
as a restart.
+   *
+   * When host affinity is disabled a move / restart is only allowed on 
ANY_HOST
+   * When host affinity is enabled move / restart is allowed on specific or 
ANY_HOST
+   * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+   *
+   * @param processorId logical id of the container 0, 1, 2
+   * @param destinationHost host where container is desired to be moved, 
acceptable values of this param are any valid
+   *                        hostname or "ANY_HOST"(in this case the request is 
sent to resource manager for any host)
+   * @param containerAllocator to request physical resources
+   */
+  public ContainerPlacementStatus registerContainerPlacementAction(String 
processorId, String destinationHost,
+      ContainerAllocator containerAllocator, Optional<Long> requestExpiry) {
+    LOG.info("Received ControlAction request to move or restart container with 
processor id {} to host {}", processorId, destinationHost);
+    ContainerPlacementStatus actionStatus = 
checkValidControlAction(processorId, destinationHost);
+    if (actionStatus.status == 
ContainerPlacementStatus.StatusCode.BAD_REQUEST) {
+      return actionStatus;
+    }
+
+    SamzaResource currentResource = 
samzaApplicationState.runningProcessors.get(processorId);
+    LOG.info("Processor ID: {} matched a active container with deployment ID: 
{} running on host: {}", processorId,
+        currentResource.getContainerId(), currentResource.getHost());
+
+    if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+      LOG.info("Changing the requested host to {} because either it is 
requested or host affinity is disabled",
+          ResourceRequestState.ANY_HOST);
+      destinationHost = ANY_HOST;
+    }
+
+    SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
+    ContainerPlacementMetadata actionMetaData =
+        new ContainerPlacementMetadata(processorId, 
currentResource.getContainerId(), currentResource.getHost(),
+            destinationHost, actionStatus, requestExpiry.isPresent() ? 
requestExpiry.get() : DEFAULT_CONTROL_ACTION_EXPIRY);
+
+    // Record the resource request for monitoring
+    
actionMetaData.setActionStatus(ContainerPlacementStatus.StatusCode.IN_PROGRESS);
+    actionMetaData.recordResourceRequest(resourceRequest);
+    actions.put(processorId, actionMetaData);
+    // note this also updates state.preferredHost count
+    containerAllocator.issueResourceRequest(resourceRequest);
+    LOG.info("Control action with metadata {} and issued a request for 
resources in progress", actionMetaData);
+    return actionStatus;
+  }
+
+  public Optional<Long> getActionExpiryTimeout(String processorId) {
+    return this.actions.containsKey(processorId) ? Optional.of(
+        this.actions.get(processorId).getRequestActionExpiryTimeout()) : 
Optional.empty();
+  }
+
+  /**
+   * Handles the container allocation for an existing container placement 
action by issuing a stop on the active container and
+   * waiting for the active container to shutdown for a timeout of {@code 
actionMetaData#getRequestActionExpiryTimeout}.
+   *
+   * Case 1. If active container fails to stop mark the container placement 
action failed.
+   * Case 2. Otherwise once active container shuts down then issue a start for 
the container on the preferred host.
+   *
+   * This method is invoked by the allocator thread which waits on notify from 
the thread issuing
+   * callbacks i.e {@link ContainerProcessManager}
+   */
+  private void handleContainerAllocationForExistingControlAction(String 
processorId, ContainerAllocator allocator,
+      ResourceRequestState state, SamzaResourceRequest request, String 
preferredhost) {
+    // check if container is already dead without issuing a stop here, fail 
the move request
+    ContainerPlacementMetadata actionMetaData = 
getControlActionMetadata(processorId).get();
 
 Review comment:
   only called when is present

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to