alnzng commented on a change in pull request #1515:
URL: https://github.com/apache/samza/pull/1515#discussion_r690524892



##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobModelHelper {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobModelHelper.class);
+
+  private final LocalityManager localityManager;
+  private final TaskAssignmentManager taskAssignmentManager;
+  private final TaskPartitionAssignmentManager taskPartitionAssignmentManager;
+  private final StreamMetadataCache streamMetadataCache;
+  private final JobModelCalculator jobModelCalculator;
+
+  public JobModelHelper(LocalityManager localityManager, TaskAssignmentManager 
taskAssignmentManager,
+      TaskPartitionAssignmentManager taskPartitionAssignmentManager, 
StreamMetadataCache streamMetadataCache,
+      JobModelCalculator jobModelCalculator) {
+    this.localityManager = localityManager;
+    this.taskAssignmentManager = taskAssignmentManager;
+    this.taskPartitionAssignmentManager = taskPartitionAssignmentManager;
+    this.streamMetadataCache = streamMetadataCache;
+    this.jobModelCalculator = jobModelCalculator;
+  }
+
+  public JobModel newJobModel(Config config, Map<TaskName, Integer> 
changelogPartitionMapping) {
+    GrouperMetadata grouperMetadata = getGrouperMetadata(config, 
this.localityManager, this.taskAssignmentManager,
+        this.taskPartitionAssignmentManager);
+    JobModel jobModel =
+        this.jobModelCalculator.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            grouperMetadata);
+    updateTaskAssignments(jobModel, this.taskAssignmentManager, 
this.taskPartitionAssignmentManager, grouperMetadata);
+    return jobModel;
+  }
+
+  private GrouperMetadata getGrouperMetadata(Config config, LocalityManager 
localityManager,
+      TaskAssignmentManager taskAssignmentManager, 
TaskPartitionAssignmentManager taskPartitionAssignmentManager) {
+    Map<String, LocationId> processorLocality = getProcessorLocality(config, 
localityManager);
+    Map<TaskName, TaskMode> taskModes = taskAssignmentManager.readTaskModes();
+
+    Map<TaskName, String> taskNameToProcessorId = new HashMap<>();
+    Map<TaskName, LocationId> taskLocality = new HashMap<>();
+    // We read the taskAssignment only for ActiveTasks, i.e., tasks that have 
no task-mode or have an active task mode
+    taskAssignmentManager.readTaskAssignment().forEach((taskNameString, 
containerId) -> {
+      TaskName taskName = new TaskName(taskNameString);
+      if (isActiveTask(taskName, taskModes)) {
+        taskNameToProcessorId.put(taskName, containerId);
+        if (processorLocality.containsKey(containerId)) {
+          taskLocality.put(taskName, processorLocality.get(containerId));
+        }
+      }
+    });
+
+    Map<SystemStreamPartition, List<String>> sspToTaskMapping =
+        taskPartitionAssignmentManager.readTaskPartitionAssignments();
+    Map<TaskName, List<SystemStreamPartition>> taskPartitionAssignments = new 
HashMap<>();
+    // Task to partition assignments is stored as {@see SystemStreamPartition} 
to list of {@see TaskName} in
+    // coordinator stream. This is done due to the 1 MB value size limit in a 
kafka topic. Conversion to
+    // taskName to SystemStreamPartitions is done here to wire-in the data to 
{@see JobModel}.
+    sspToTaskMapping.forEach((systemStreamPartition, taskNames) -> 
taskNames.forEach(taskNameString -> {
+      TaskName taskName = new TaskName(taskNameString);
+      if (isActiveTask(taskName, taskModes)) {
+        taskPartitionAssignments.putIfAbsent(taskName, new ArrayList<>());
+        taskPartitionAssignments.get(taskName).add(systemStreamPartition);
+      }
+    }));
+    return new GrouperMetadataImpl(processorLocality, taskLocality, 
taskPartitionAssignments, taskNameToProcessorId);
+  }
+
+  /**
+   * Retrieves and returns the processor locality of a samza job using 
provided {@see Config} and {@see LocalityManager}.
+   * @param config provides the configurations defined by the user. Required 
to connect to the storage layer.
+   * @param localityManager provides the processor to host mapping persisted 
to the metadata store.
+   * @return the processor locality.
+   */
+  private static Map<String, LocationId> getProcessorLocality(Config config, 
LocalityManager localityManager) {
+    Map<String, LocationId> containerToLocationId = new HashMap<>();
+    Map<String, ProcessorLocality> existingContainerLocality = 
localityManager.readLocality().getProcessorLocalities();
+
+    for (int i = 0; i < new JobConfig(config).getContainerCount(); i++) {
+      String containerId = Integer.toString(i);
+      LocationId locationId = 
Optional.ofNullable(existingContainerLocality.get(containerId))
+          .map(ProcessorLocality::host)
+          .filter(StringUtils::isNotEmpty)
+          .map(LocationId::new)
+          // To handle the case when the container count is increased between 
two different runs of a samza-yarn job,
+          // set the locality of newly added containers to any_host.
+          .orElse(new LocationId("ANY_HOST"));
+      containerToLocationId.put(containerId, locationId);
+    }
+    return containerToLocationId;
+  }
+
+  /**
+   * This method does the following:
+   * 1. Deletes the existing task assignments if the partition-task grouping 
has changed from the previous run of the job.
+   * 2. Saves the newly generated task assignments to the storage layer 
through the {@param TaskAssignementManager}.
+   *
+   * @param jobModel              represents the {@see JobModel} of the samza 
job.
+   * @param taskAssignmentManager required to persist the processor to task 
assignments to the metadata store.
+   * @param taskPartitionAssignmentManager required to persist the task to 
partition assignments to the metadata store.
+   * @param grouperMetadata       provides the historical metadata of the 
samza application.
+   */
+  private void updateTaskAssignments(JobModel jobModel, TaskAssignmentManager 
taskAssignmentManager,
+      TaskPartitionAssignmentManager taskPartitionAssignmentManager, 
GrouperMetadata grouperMetadata) {
+    LOG.info("Storing the task assignments into metadata store.");
+    Set<String> activeTaskNames = new HashSet<>();
+    Set<String> standbyTaskNames = new HashSet<>();
+    Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>();
+
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        if (TaskMode.Active.equals(taskModel.getTaskMode())) {
+          activeTaskNames.add(taskModel.getTaskName().getTaskName());
+        }
+        if (TaskMode.Standby.equals(taskModel.getTaskMode())) {
+          standbyTaskNames.add(taskModel.getTaskName().getTaskName());
+        }
+        systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions());
+      }
+    }
+
+    Map<TaskName, String> previousTaskToContainerId = 
grouperMetadata.getPreviousTaskToProcessorAssignment();
+    if (activeTaskNames.size() != previousTaskToContainerId.size()) {
+      LOG.warn(String.format(
+          "Current task count %s does not match saved task count %s. Stateful 
jobs may observe misalignment of keys!",
+          activeTaskNames.size(), previousTaskToContainerId.size()));
+      // If the tasks changed, then the partition-task grouping is also likely 
changed and we can't handle that
+      // without a much more complicated mapping. Further, the partition count 
may have changed, which means
+      // input message keys are likely reshuffled w.r.t. partitions, so the 
local state may not contain necessary
+      // data associated with the incoming keys. Warn the user and default to 
grouper
+      // In this scenario the tasks may have been reduced, so we need to 
delete all the existing messages
+      taskAssignmentManager.deleteTaskContainerMappings(
+          
previousTaskToContainerId.keySet().stream().map(TaskName::getTaskName).collect(Collectors.toList()));
+      taskPartitionAssignmentManager.delete(systemStreamPartitions);
+    }
+
+    // if the set of standby tasks has changed, e.g., when the 
replication-factor changed, or the active-tasks-set has
+    // changed, we log a warning and delete the existing mapping for these 
tasks
+    Set<String> previousStandbyTasks = taskAssignmentManager.readTaskModes()
+        .entrySet()
+        .stream()
+        .filter(taskNameToTaskModeEntry -> 
TaskMode.Standby.equals(taskNameToTaskModeEntry.getValue()))
+        .map(taskNameToTaskModeEntry -> 
taskNameToTaskModeEntry.getKey().getTaskName())
+        .collect(Collectors.toSet());
+    if (!standbyTaskNames.equals(previousStandbyTasks)) {

Review comment:
       Interesting bug, thanks for the fix!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to