alnzng commented on a change in pull request #1515: URL: https://github.com/apache/samza/pull/1515#discussion_r690524892
########## File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.task.GrouperMetadata; +import org.apache.samza.container.grouper.task.GrouperMetadataImpl; +import org.apache.samza.container.grouper.task.TaskAssignmentManager; +import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.ProcessorLocality; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.runtime.LocationId; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JobModelHelper { + private static final Logger LOG = LoggerFactory.getLogger(JobModelHelper.class); + + private final LocalityManager localityManager; + private final TaskAssignmentManager taskAssignmentManager; + private final TaskPartitionAssignmentManager taskPartitionAssignmentManager; + private final StreamMetadataCache streamMetadataCache; + private final JobModelCalculator jobModelCalculator; + + public JobModelHelper(LocalityManager localityManager, TaskAssignmentManager taskAssignmentManager, + TaskPartitionAssignmentManager taskPartitionAssignmentManager, StreamMetadataCache streamMetadataCache, + JobModelCalculator jobModelCalculator) { + this.localityManager = localityManager; + this.taskAssignmentManager = taskAssignmentManager; + this.taskPartitionAssignmentManager = taskPartitionAssignmentManager; + this.streamMetadataCache = streamMetadataCache; + this.jobModelCalculator = jobModelCalculator; + } + + public JobModel newJobModel(Config config, Map<TaskName, Integer> changelogPartitionMapping) { + GrouperMetadata grouperMetadata = getGrouperMetadata(config, this.localityManager, this.taskAssignmentManager, + this.taskPartitionAssignmentManager); + JobModel jobModel = + this.jobModelCalculator.calculateJobModel(config, changelogPartitionMapping, this.streamMetadataCache, + grouperMetadata); + updateTaskAssignments(jobModel, this.taskAssignmentManager, this.taskPartitionAssignmentManager, grouperMetadata); + return jobModel; + } + + private GrouperMetadata getGrouperMetadata(Config config, LocalityManager localityManager, + TaskAssignmentManager taskAssignmentManager, TaskPartitionAssignmentManager taskPartitionAssignmentManager) { + Map<String, LocationId> processorLocality = getProcessorLocality(config, localityManager); + Map<TaskName, TaskMode> taskModes = taskAssignmentManager.readTaskModes(); + + Map<TaskName, String> taskNameToProcessorId = new HashMap<>(); + Map<TaskName, LocationId> taskLocality = new HashMap<>(); + // We read the taskAssignment only for ActiveTasks, i.e., tasks that have no task-mode or have an active task mode + taskAssignmentManager.readTaskAssignment().forEach((taskNameString, containerId) -> { + TaskName taskName = new TaskName(taskNameString); + if (isActiveTask(taskName, taskModes)) { + taskNameToProcessorId.put(taskName, containerId); + if (processorLocality.containsKey(containerId)) { + taskLocality.put(taskName, processorLocality.get(containerId)); + } + } + }); + + Map<SystemStreamPartition, List<String>> sspToTaskMapping = + taskPartitionAssignmentManager.readTaskPartitionAssignments(); + Map<TaskName, List<SystemStreamPartition>> taskPartitionAssignments = new HashMap<>(); + // Task to partition assignments is stored as {@see SystemStreamPartition} to list of {@see TaskName} in + // coordinator stream. This is done due to the 1 MB value size limit in a kafka topic. Conversion to + // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}. + sspToTaskMapping.forEach((systemStreamPartition, taskNames) -> taskNames.forEach(taskNameString -> { + TaskName taskName = new TaskName(taskNameString); + if (isActiveTask(taskName, taskModes)) { + taskPartitionAssignments.putIfAbsent(taskName, new ArrayList<>()); + taskPartitionAssignments.get(taskName).add(systemStreamPartition); + } + })); + return new GrouperMetadataImpl(processorLocality, taskLocality, taskPartitionAssignments, taskNameToProcessorId); + } + + /** + * Retrieves and returns the processor locality of a samza job using provided {@see Config} and {@see LocalityManager}. + * @param config provides the configurations defined by the user. Required to connect to the storage layer. + * @param localityManager provides the processor to host mapping persisted to the metadata store. + * @return the processor locality. + */ + private static Map<String, LocationId> getProcessorLocality(Config config, LocalityManager localityManager) { + Map<String, LocationId> containerToLocationId = new HashMap<>(); + Map<String, ProcessorLocality> existingContainerLocality = localityManager.readLocality().getProcessorLocalities(); + + for (int i = 0; i < new JobConfig(config).getContainerCount(); i++) { + String containerId = Integer.toString(i); + LocationId locationId = Optional.ofNullable(existingContainerLocality.get(containerId)) + .map(ProcessorLocality::host) + .filter(StringUtils::isNotEmpty) + .map(LocationId::new) + // To handle the case when the container count is increased between two different runs of a samza-yarn job, + // set the locality of newly added containers to any_host. + .orElse(new LocationId("ANY_HOST")); + containerToLocationId.put(containerId, locationId); + } + return containerToLocationId; + } + + /** + * This method does the following: + * 1. Deletes the existing task assignments if the partition-task grouping has changed from the previous run of the job. + * 2. Saves the newly generated task assignments to the storage layer through the {@param TaskAssignementManager}. + * + * @param jobModel represents the {@see JobModel} of the samza job. + * @param taskAssignmentManager required to persist the processor to task assignments to the metadata store. + * @param taskPartitionAssignmentManager required to persist the task to partition assignments to the metadata store. + * @param grouperMetadata provides the historical metadata of the samza application. + */ + private void updateTaskAssignments(JobModel jobModel, TaskAssignmentManager taskAssignmentManager, + TaskPartitionAssignmentManager taskPartitionAssignmentManager, GrouperMetadata grouperMetadata) { + LOG.info("Storing the task assignments into metadata store."); + Set<String> activeTaskNames = new HashSet<>(); + Set<String> standbyTaskNames = new HashSet<>(); + Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>(); + + for (ContainerModel containerModel : jobModel.getContainers().values()) { + for (TaskModel taskModel : containerModel.getTasks().values()) { + if (TaskMode.Active.equals(taskModel.getTaskMode())) { + activeTaskNames.add(taskModel.getTaskName().getTaskName()); + } + if (TaskMode.Standby.equals(taskModel.getTaskMode())) { + standbyTaskNames.add(taskModel.getTaskName().getTaskName()); + } + systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions()); + } + } + + Map<TaskName, String> previousTaskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment(); + if (activeTaskNames.size() != previousTaskToContainerId.size()) { + LOG.warn(String.format( + "Current task count %s does not match saved task count %s. Stateful jobs may observe misalignment of keys!", + activeTaskNames.size(), previousTaskToContainerId.size())); + // If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that + // without a much more complicated mapping. Further, the partition count may have changed, which means + // input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary + // data associated with the incoming keys. Warn the user and default to grouper + // In this scenario the tasks may have been reduced, so we need to delete all the existing messages + taskAssignmentManager.deleteTaskContainerMappings( + previousTaskToContainerId.keySet().stream().map(TaskName::getTaskName).collect(Collectors.toList())); + taskPartitionAssignmentManager.delete(systemStreamPartitions); + } + + // if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has + // changed, we log a warning and delete the existing mapping for these tasks + Set<String> previousStandbyTasks = taskAssignmentManager.readTaskModes() + .entrySet() + .stream() + .filter(taskNameToTaskModeEntry -> TaskMode.Standby.equals(taskNameToTaskModeEntry.getValue())) + .map(taskNameToTaskModeEntry -> taskNameToTaskModeEntry.getKey().getTaskName()) + .collect(Collectors.toSet()); + if (!standbyTaskNames.equals(previousStandbyTasks)) { Review comment: Interesting bug, thanks for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org