rmatharu commented on a change in pull request #980: Cleaned up AM logs and
naming convention
URL: https://github.com/apache/samza/pull/980#discussion_r270516141
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
##########
@@ -121,56 +118,54 @@ public void run() {
}
/**
- * Assign resources from the cluster manager and matches them to run
container processes on them.
- *
+ * Assigns resources received from the cluster manager to processors.
*/
protected abstract void assignResourceRequests();
/**
- * Updates the request state and runs a container process on the specified
host. Assumes a resource
+ * Updates the request state and runs a processor on the specified host.
Assumes a resource
* is available on the preferred host, so the caller must verify that before
invoking this method.
*
* @param request the {@link SamzaResourceRequest} which is
being handled.
- * @param preferredHost the preferred host on which the
StreamProcessor process should be run or
+ * @param preferredHost the preferred host on which the processor
should be run or
* {@link ResourceRequestState#ANY_HOST} if there
is no host preference.
- * @throws
- * SamzaException if there is no allocated resource in the specified host.
+ * @throws SamzaException if there is no allocated
resource in the specified host.
*/
protected void runStreamProcessor(SamzaResourceRequest request, String
preferredHost) {
- CommandBuilder builder = getCommandBuilder(request.getContainerID());
+ CommandBuilder builder = getCommandBuilder(request.getProcessorId());
// Get the available resource
SamzaResource resource = peekAllocatedResource(preferredHost);
- if (resource == null)
- throw new SamzaException("Expected resource was unavailable on host " +
preferredHost);
+ if (resource == null) {
+ throw new SamzaException("Expected resource for Processor ID: " +
request.getProcessorId() + " was unavailable on host: " + preferredHost);
+ }
// Update state
resourceRequestState.updateStateAfterAssignment(request, preferredHost,
resource);
- String containerID = request.getContainerID();
+ String processorId = request.getProcessorId();
- //run container on resource
- log.info("Found available resources on {}. Assigning request for
container_id {} with "
- + "timestamp {} to resource {}",
- new Object[]{preferredHost, String.valueOf(containerID),
request.getRequestTimestampMs(), resource.getResourceID()});
+ // Run processor on resource
+ log.info("Found Container ID: {} for Processor ID: {} on host: {} for
request creation time: {}.",
+ resource.getContainerId(), processorId, preferredHost,
request.getRequestTimestampMs());
- // Update container state as "pending" and then issue a request to launch
it. It's important to perform the state-update
+ // Update processor state as "pending" and then issue a request to launch
it. It's important to perform the state-update
// prior to issuing the request. Otherwise, there's a race where the
response callback may arrive sooner and not see
- // the container as "pending" (SAMZA-2117)
+ // the processor as "pending" (SAMZA-2117)
- state.pendingContainers.put(containerID, resource);
+ state.pendingProcessors.put(processorId, resource);
clusterResourceManager.launchStreamProcessor(resource, builder);
}
/**
* Called during initial request for resources
*
- * @param resourceToHostMapping A Map of [containerId, hostName] containerId
is the ID of the container process
- * to run on the resource. hostName is the
host on which the resource must be allocated.
- * The hostName value is null, either
+ * @param processorToHostMapping A Map of [processorId, hostName], where
processorId is the ID of the Samza processor
+ * to run on the resource. hostName is the
host on which the resource should be allocated.
+ * The hostName value is null, either
* - when host-affinity has never been
enabled, or
* - when host-affinity is enabled and job is
run for the first time
Review comment:
or when the number of containers has been increased.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services