alnzng commented on code in PR #1615: URL: https://github.com/apache/samza/pull/1615#discussion_r909195736
########## samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java: ########## @@ -317,15 +317,55 @@ public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName * @return fanned out Startpoints */ public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException { + return getStartpointFanOutPerTask(taskName) + .map(startpointFanOutPerTask -> ImmutableMap.copyOf(startpointFanOutPerTask.getFanOuts())).orElse(ImmutableMap.of()); + } + + private Optional<StartpointFanOutPerTask> getStartpointFanOutPerTask(TaskName taskName) throws IOException { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(taskName, "TaskName cannot be null"); byte[] fanOutBytes = fanOutStore.get(toFanOutStoreKey(taskName)); if (ArrayUtils.isEmpty(fanOutBytes)) { - return ImmutableMap.of(); + return Optional.empty(); + } + return Optional.of(objectMapper.readValue(fanOutBytes, StartpointFanOutPerTask.class)); + } + + /** + * Remove the fanned out startpoints for the specified the system stream partitions of the given task. This method + * allows to partially remove the fanned out startpoints for the given task. + * + * Remove the whole task fan out from the store if the fan outs of all system stream partitions of the task are + * removed. No action takes if not any specify system stream partition + * + * @param taskName to (partially) remove the fanned out startpoints for + * @param ssps to remove the fanned out startpoints for + */ + public void removeFanOutForTaskSSPs(TaskName taskName, Set<SystemStreamPartition> ssps) { + Preconditions.checkState(!stopped, "Underlying metadata store not available"); + Preconditions.checkNotNull(taskName, "TaskName cannot be null"); + if (ssps == null || ssps.isEmpty()) { + return; + } + try { + getStartpointFanOutPerTask(taskName).ifPresent(fanOutPerTask -> { + Map<SystemStreamPartition, Startpoint> fanOuts = fanOutPerTask.getFanOuts(); + fanOuts.entrySet().removeIf(e -> ssps.contains(e.getKey())); + if (fanOuts.isEmpty()) { + removeFanOutForTask(taskName); + } else { + try { + fanOutStore.put(toFanOutStoreKey(taskName), objectMapper.writeValueAsBytes(fanOutPerTask)); + } catch (IOException e) { + throw new SamzaException(e); Review Comment: sure, will do it here and below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org