Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container Placement Service (core functionality) for container move and restart URL: https://github.com/apache/samza/pull/1219#discussion_r348859993
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java ########## @@ -161,4 +237,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String processorId, String pref allocator.requestResource(processorId, ResourceRequestState.ANY_HOST); } } + + /** + * Registers a container placement action to move the running container to destination host, if destination host is same as the + * host on which container is running, container placement action is treated as a restart. + * + * When host affinity is disabled a move / restart is only allowed on ANY_HOST + * When host affinity is enabled move / restart is allowed on specific or ANY_HOST + * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs + * + * @param processorId logical id of the container 0, 1, 2 + * @param destinationHost host where container is desired to be moved, acceptable values of this param are any valid + * hostname or "ANY_HOST"(in this case the request is sent to resource manager for any host) + * @param containerAllocator to request physical resources + */ + public ContainerPlacementStatus registerContainerPlacementAction(String processorId, String destinationHost, + ContainerAllocator containerAllocator, Optional<Long> requestExpiry) { + LOG.info("Received ControlAction request to move or restart container with processor id {} to host {}", processorId, destinationHost); + ContainerPlacementStatus actionStatus = checkValidControlAction(processorId, destinationHost); + if (actionStatus.status == ContainerPlacementStatus.StatusCode.BAD_REQUEST) { + return actionStatus; + } + + SamzaResource currentResource = samzaApplicationState.runningProcessors.get(processorId); + LOG.info("Processor ID: {} matched a active container with deployment ID: {} running on host: {}", processorId, + currentResource.getContainerId(), currentResource.getHost()); + + if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) { + LOG.info("Changing the requested host to {} because either it is requested or host affinity is disabled", + ResourceRequestState.ANY_HOST); + destinationHost = ANY_HOST; + } + + SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(processorId, destinationHost); + ContainerPlacementMetadata actionMetaData = + new ContainerPlacementMetadata(processorId, currentResource.getContainerId(), currentResource.getHost(), + destinationHost, actionStatus, requestExpiry.isPresent() ? requestExpiry.get() : DEFAULT_CONTROL_ACTION_EXPIRY); + + // Record the resource request for monitoring + actionMetaData.setActionStatus(ContainerPlacementStatus.StatusCode.IN_PROGRESS); + actionMetaData.recordResourceRequest(resourceRequest); + actions.put(processorId, actionMetaData); + // note this also updates state.preferredHost count + containerAllocator.issueResourceRequest(resourceRequest); + LOG.info("Control action with metadata {} and issued a request for resources in progress", actionMetaData); + return actionStatus; + } + + public Optional<Long> getActionExpiryTimeout(String processorId) { + return this.actions.containsKey(processorId) ? Optional.of( + this.actions.get(processorId).getRequestActionExpiryTimeout()) : Optional.empty(); + } + + /** + * Handles the container allocation for an existing container placement action by issuing a stop on the active container and + * waiting for the active container to shutdown for a timeout of {@code actionMetaData#getRequestActionExpiryTimeout}. + * + * Case 1. If active container fails to stop mark the container placement action failed. + * Case 2. Otherwise once active container shuts down then issue a start for the container on the preferred host. + * + * This method is invoked by the allocator thread which waits on notify from the thread issuing + * callbacks i.e {@link ContainerProcessManager} + */ + private void handleContainerAllocationForExistingControlAction(String processorId, ContainerAllocator allocator, + ResourceRequestState state, SamzaResourceRequest request, String preferredhost) { + // check if container is already dead without issuing a stop here, fail the move request + ContainerPlacementMetadata actionMetaData = getControlActionMetadata(processorId).get(); Review comment: only called when is present ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services