shanthoosh commented on code in PR #1615:
URL: https://github.com/apache/samza/pull/1615#discussion_r909087376


##########
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java:
##########
@@ -317,15 +317,55 @@ public Map<TaskName, Map<SystemStreamPartition, 
Startpoint>> fanOut(Map<TaskName
    * @return fanned out Startpoints
    */
   public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName 
taskName) throws IOException {
+    return getStartpointFanOutPerTask(taskName)
+        .map(startpointFanOutPerTask -> 
ImmutableMap.copyOf(startpointFanOutPerTask.getFanOuts())).orElse(ImmutableMap.of());
+  }
+
+  private Optional<StartpointFanOutPerTask> 
getStartpointFanOutPerTask(TaskName taskName) throws IOException {
     Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(taskName, "TaskName cannot be null");
 
     byte[] fanOutBytes = fanOutStore.get(toFanOutStoreKey(taskName));
     if (ArrayUtils.isEmpty(fanOutBytes)) {
-      return ImmutableMap.of();
+      return Optional.empty();
+    }
+    return Optional.of(objectMapper.readValue(fanOutBytes, 
StartpointFanOutPerTask.class));
+  }
+
+  /**
+   * Remove the fanned out startpoints for the specified the system stream 
partitions of the given task. This method
+   * allows to partially remove the fanned out startpoints for the given task.
+   *
+   * Remove the whole task fan out from the store if the fan outs of all 
system stream partitions of the task are
+   * removed. No action takes if not any specify system stream partition
+   *
+   * @param taskName to (partially) remove the fanned out startpoints for
+   * @param ssps to remove the fanned out startpoints for
+   */
+  public void removeFanOutForTaskSSPs(TaskName taskName, 
Set<SystemStreamPartition> ssps) {
+    Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
+    Preconditions.checkNotNull(taskName, "TaskName cannot be null");
+    if (ssps == null || ssps.isEmpty()) {
+      return;
+    }
+    try {
+      getStartpointFanOutPerTask(taskName).ifPresent(fanOutPerTask -> {
+        Map<SystemStreamPartition, Startpoint> fanOuts = 
fanOutPerTask.getFanOuts();
+        fanOuts.entrySet().removeIf(e -> ssps.contains(e.getKey()));
+        if (fanOuts.isEmpty()) {
+          removeFanOutForTask(taskName);

Review Comment:
   Minor: Would be better to log here to indicate that we are deleting the 
fanout for the task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to