shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284050880
########## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ########## @@ -173,71 +205,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); - metadataStore.delete(toStoreKey(ssp, taskName)); + readWriteStore.delete(toReadWriteStoreKey(ssp, taskName)); } /** - * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from - * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel} + * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with + * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace + * to a "fan out" namespace. * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this * method to assign the Startpoints to the appropriate tasks. - * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to. - * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName. + * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to. + * @return The set of active {@link TaskName}s that were fanned out to. */ - public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel jobModel) { + public Set<TaskName> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException { Preconditions.checkState(!stopped, "Underlying metadata store not available"); - Preconditions.checkNotNull(jobModel, "JobModel cannot be null"); - - HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>(); - - // Inspect the job model for TaskName-to-SSPs mapping and re-map startpoints from SSP-only keys to SSP+TaskName keys. - for (ContainerModel containerModel: jobModel.getContainers().values()) { - for (TaskModel taskModel : containerModel.getTasks().values()) { - TaskName taskName = taskModel.getTaskName(); - for (SystemStreamPartition ssp : taskModel.getSystemStreamPartitions()) { - Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key - if (startpoint == null) { - LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName); - continue; - } - - LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined by the job model.", ssp); - Startpoint startpointForTask = readStartpoint(ssp, taskName); - if (startpointForTask == null || startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) { - writeStartpoint(ssp, taskName, startpoint); - sspsToDelete.add(ssp); // Mark for deletion - LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, taskName); - } else { - LOG.info("Startpoint for SSP: {} and task: {} already exists and will not be overwritten.", ssp, taskName); - } + Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs cannot be null or empty"); + + // construct fan out with the existing readWriteStore entries and mark the entries for deletion after fan out + Instant now = Instant.now(); + HashMultimap<SystemStreamPartition, TaskName> deleteKeys = HashMultimap.create(); + HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>(); + for (TaskName taskName : taskToSSPs.keySet()) { + Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName); + if (CollectionUtils.isEmpty(ssps)) { + LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName()); + continue; + } + for (SystemStreamPartition ssp : ssps) { + Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key + if (startpoint != null) { + deleteKeys.put(ssp, null); + } + + Startpoint startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key + if (startpointForTask != null) { + deleteKeys.put(ssp, taskName); + } + + Startpoint startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); + if (startpointWithPrecedence == null) { + continue; + } + fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); + fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence); + } + } + + if (fanOuts.isEmpty()) { + LOG.debug("No fan outs created."); + return ImmutableSet.of(); + } + + LOG.info("Fanning out to {} tasks", fanOuts.size()); + + // Fan out to store + for (TaskName taskName : fanOuts.keySet()) { + String fanOutKey = toFanOutStoreKey(taskName); + StartpointFanOutPerTask newFanOut = fanOuts.get(taskName); + fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut)); + } + + for (SystemStreamPartition ssp : deleteKeys.keySet()) { + for (TaskName taskName : deleteKeys.get(ssp)) { + if (taskName != null) { + deleteStartpoint(ssp, taskName); + } else { + deleteStartpoint(ssp); } } } - // Delete SSP-only keys - sspsToDelete.forEach(ssp -> { - deleteStartpoint(ssp); - LOG.info("All Startpoints for SSP: {} have been grouped to the appropriate tasks and the SSP was deleted."); - }); + return ImmutableSet.copyOf(fanOuts.keySet()); + } - return ImmutableSet.copyOf(sspsToDelete); + /** + * Read the fanned out {@link Startpoint}s for the given {@link TaskName} + * @param taskName to read the fan out Startpoints for + * @return fanned out Startpoints + */ + public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException { + Preconditions.checkState(!stopped, "Underlying metadata store not available"); + Preconditions.checkNotNull(taskName, "TaskName cannot be null"); + + byte[] fanOutBytes = fanOutStore.get(toFanOutStoreKey(taskName)); + if (ArrayUtils.isEmpty(fanOutBytes)) { + return ImmutableMap.of(); + } + StartpointFanOutPerTask startpointFanOutPerTask = objectMapper.readValue(fanOutBytes, StartpointFanOutPerTask.class); + return ImmutableMap.copyOf(startpointFanOutPerTask.getFanOuts()); } /** - * Relinquish resources held by the underlying {@link MetadataStore} + * Deletes the fanned out {@link Startpoint} for the given {@link TaskName} + * @param taskName to delete the fan out Startpoints for */ - public void stop() { - stopped = true; - // Metadata store lifecycle is managed outside of the StartpointManager, so not closing it. + public void removeFanOutForTask(TaskName taskName) { + Preconditions.checkState(!stopped, "Underlying metadata store not available"); + Preconditions.checkNotNull(taskName, "TaskName cannot be null"); + + fanOutStore.delete(toFanOutStoreKey(taskName)); } @VisibleForTesting MetadataStore getMetadataStore() { return metadataStore; } - private static String toStoreKey(SystemStreamPartition ssp, TaskName taskName) { - return new String(new JsonSerdeV2<>().toBytes(new StartpointKey(ssp, taskName))); + @VisibleForTesting + MetadataStore getReadWriteStore() { + return readWriteStore; + } + + @VisibleForTesting + MetadataStore getFanOutStore() { + return fanOutStore; + } + + @VisibleForTesting + ObjectMapper getObjectMapper() { + return objectMapper; + } + + private static Startpoint resolveStartpointPrecendence(Startpoint startpoint1, Startpoint startpoint2) { + if (startpoint1 != null && startpoint2 != null) { + // if SSP-only and SSP+taskName startpoints both exist, resolve to the one with the latest timestamp + if (startpoint1.getCreationTimestamp() > startpoint2.getCreationTimestamp()) { + return startpoint1; + } + return startpoint2; + } + return startpoint1 != null ? startpoint1 : startpoint2; + } + + private static String toReadWriteStoreKey(SystemStreamPartition ssp, TaskName taskName) { + String storeKey = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId()); Review comment: Just to be safe, would be better to add not-null checks to some of the fields before accessing them. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services