mynameborat commented on a change in pull request #1448: URL: https://github.com/apache/samza/pull/1448#discussion_r536479945
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ########## @@ -236,11 +236,20 @@ public void start() { diagnosticsManager.get().start(); } + if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) { + LOG.info( + "Set neededProcessors prior to starting clusterResourceManager because it gets running containres from prev attempts in AM HA."); + state.processorCount.set(state.jobModelManager.jobModel().getContainers().size()); + state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size()); + } + LOG.info("Starting the cluster resource manager"); clusterResourceManager.start(); - state.processorCount.set(state.jobModelManager.jobModel().getContainers().size()); - state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size()); + if (!jobConfig.getApplicationMasterHighAvailabilityEnabled()) { + state.processorCount.set(state.jobModelManager.jobModel().getContainers().size()); + state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size()); + } Review comment: If i am reading this right, we are just doing the same thing regardless of enabled vs not except logging it additionally. 1. Why can't we move the initializations before `clusterResourceManager.start()` and consolidate this? ########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ########## @@ -403,29 +412,23 @@ public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatuses) { @Override public void onStreamProcessorLaunchSuccess(SamzaResource resource) { - String containerId = resource.getContainerId(); - String containerHost = resource.getHost(); - - // 1. Obtain the processor ID for the pending container on this resource. - String processorId = getPendingProcessorId(containerId); - LOG.info("Successfully started Processor ID: {} on Container ID: {} on host: {}", - processorId, containerId, containerHost); - - // 2. Remove the container from the pending buffer and add it to the running buffer. Additionally, update the - // job-health metric. - if (processorId != null) { - LOG.info("Moving Processor ID: {} on Container ID: {} on host: {} from pending to running state.", - processorId, containerId, containerHost); - state.pendingProcessors.remove(processorId); - state.runningProcessors.put(processorId, resource); - if (state.neededProcessors.decrementAndGet() == 0) { - state.jobHealthy.set(true); - } - containerManager.handleContainerLaunchSuccess(processorId, containerHost); - } else { - LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " + - "Ignoring invalid/redundant notification.", containerId, containerHost); + // Scenario 1: processor belongs to current attempt of the job. + // This means, the current AM had placed a request for the processor + // and hence containerId should be found in the pendingProcessor map + if (state.pendingProcessors.containsValue(resource)) { + handleNewProcessorLaunchSuccess(resource); + return; + } + // Scenario 2: Due to AM HA, processor could belong to the previous attempt of the job. + // This means, the current AM did not place a request for the processor as it was already running. + // Hence it will be in the runningProcessors map and not in the pendingProcessor Map + if (jobConfig.getApplicationMasterHighAvailabilityEnabled() && state.runningProcessors.containsValue(resource)) { + handleRunningProcessorLaunchSuccess(resource); + return; } + + LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " + + "Ignoring invalid/redundant notification.", resource.getContainerId(), resource.getHost()); Review comment: would suggest do ``` if () else if() else ``` instead as it eliminates the need to use multiple control statements (return) across the function to make it more readable. It becomes tedious especially if this function evolves and starts having updates to local state. ########## File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala ########## @@ -44,25 +45,26 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp var validResourceRequest = true var shutdownMessage: String = null var webApp: SamzaYarnAppMasterService = null - def onInit() { + def onInit(): util.Map[ContainerId, YarnContainer] = { val host = state.nodeHost val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort)) // validate that the YARN cluster can handle our container resource requirements val maxCapability = response.getMaximumResourceCapability val maxMem = maxCapability.getMemory val maxCpu = maxCapability.getVirtualCores + val previousAttemptContainers = new HashMap[ContainerId, YarnContainer]() if (isApplicationMasterHighAvailabilityEnabled) { val yarnIdToprocIdMap = new HashMap[String, String]() samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) } response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) => val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString) - if (samzaProcId != null) { - info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString)) - samzaAppState.runningProcessors.put(samzaProcId, - new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString)) - state.runningProcessors.put(samzaProcId, new YarnContainer(ctr)) - } + info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString)) + samzaAppState.runningProcessors.put(samzaProcId, + new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString)) Review comment: You don't need to populate the runningProcessors in the new flow right. If you were to populate the `pendingProcessors` instead and with the new flow `YarnClusterResourceManager` invokes a callback on `clusterManagerCallback.onStreamProcessorLaunchSuccess`. By doing so, you don't need modify any of the `ContainerPlacementManager`'s code and job health, `pendingProcessor` decrement and `runningProcessor` updates should be available for free. ########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ########## @@ -653,6 +656,20 @@ private String getPendingProcessorId(String containerId) { return null; } + /** + * Obtains the ID of the processor which is already running at a resource. + * @param resource where the processor is running + * @return the logical processorId of the processor (e.g., 0, 1, 2 ..) + */ + private String getRunningProcessorId(SamzaResource resource) { + return state.runningProcessors.entrySet() + .stream() + .filter(e -> e.getValue().equals(resource)) + .map(Map.Entry::getKey) + .findFirst() + .orElse(null); + } + Review comment: Why not use `getRunningProcessor` below by just using `getRunningProcessorId(resource.getContainerId()` instead? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org