This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 0ac55be8b Revert elasticity commits - bug found with broadcast input (#1626) 0ac55be8b is described below commit 0ac55be8bfb8bbda5a27b6c8ba20c626bcba88d1 Author: lakshmi-manasa-g <mgadup...@linkedin.com> AuthorDate: Thu Aug 4 16:40:34 2022 -0700 Revert elasticity commits - bug found with broadcast input (#1626) Symptom: Broadcast input ssp is not consumed by all containers of the job. Cause: Elasticity code changing systemconsumers and samza-core Changes: reverting all elasticity commits post SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled #1585 after which this issue was detected. updating the checkpointv1 serde to accept checkpoints written with SAMZA-2743: [Elasticity] Add keybucket into SSP serde for checkpoint #1608 list of elasticity PRs being reverted - #1625 #1610 #1608 #1607 #1603 #1598 #1597 #1596 #1589 --- .../apache/samza/checkpoint/CheckpointManager.java | 13 - .../samza/system/IncomingMessageEnvelope.java | 4 +- .../java/org/apache/samza/config/JobConfig.java | 5 +- .../java/org/apache/samza/container/RunLoop.java | 15 +- .../container/grouper/stream/GroupByPartition.java | 2 +- .../stream/GroupBySystemStreamPartition.java | 3 +- .../samza/elasticity/util/ElasticityUtils.java | 493 --------------------- .../samza/elasticity/util/TaskNameComponents.java | 84 ---- .../samza/serializers/model/SamzaObjectMapper.java | 5 +- .../apache/samza/checkpoint/OffsetManager.scala | 40 +- .../org/apache/samza/container/TaskInstance.scala | 28 +- .../samza/serializers/CheckpointV1Serde.scala | 3 +- .../org/apache/samza/config/TestJobConfig.java | 10 - .../org/apache/samza/container/TestRunLoop.java | 33 -- .../grouper/stream/TestGroupByPartition.java | 12 +- .../stream/TestGroupBySystemStreamPartition.java | 24 +- .../samza/elasticity/util/TestElasticityUtils.java | 435 ------------------ .../serializers/model/TestSamzaObjectMapper.java | 70 +-- .../samza/checkpoint/TestOffsetManager.scala | 41 -- .../apache/samza/container/TestTaskInstance.scala | 22 +- .../samza/serializers/TestCheckpointV1Serde.scala | 82 +--- .../checkpoint/kafka/KafkaCheckpointManager.scala | 32 +- .../kafka/TestKafkaCheckpointManager.java | 19 - 23 files changed, 62 insertions(+), 1413 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java index 4e76bb2fb..4dcefaa4e 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -19,8 +19,6 @@ package org.apache.samza.checkpoint; -import java.util.HashMap; -import java.util.Map; import org.apache.samza.container.TaskName; /** @@ -68,15 +66,4 @@ public interface CheckpointManager { * Clear the checkpoints in the checkpoint stream. */ default void clearCheckpoints() { } - - /** - * Returns the last recorded checkpoint for all tasks present in the implementation-specific location. - * All tasks contains all the tasks within the current job model. - * All tasks also includes tasks which may have been part of the job model during a previous deploy. - * @return A Map of TaskName to Checkpoint object. - * The Checkpoint object has the recorded offset data of the specified partition. - */ - default Map<TaskName, Checkpoint> readAllCheckpoints() { - return new HashMap<>(); - }; } diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 1f4a74096..39081703c 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -132,9 +132,7 @@ public class IncomingMessageEnvelope { if (envelopeKeyorOffset == null) { return new SystemStreamPartition(systemStreamPartition, 0); } - // modulo 31 first to best spread out the hashcode and then modulo elasticityFactor for actual keyBucket - // Note: elasticityFactor <= 16 so modulo 31 is safe to do. - int keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor; + int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor; return new SystemStreamPartition(systemStreamPartition, keyBucket); } diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index d36ac6671..b9aa82cf2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -190,6 +190,7 @@ public class JobConfig extends MapConfig { public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled"; private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true; + // Enabled elasticity for the job // number of (elastic) tasks in the job will be old task count X elasticity factor public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor"; @@ -517,8 +518,8 @@ public class JobConfig extends MapConfig { public int getElasticityFactor() { int elasticityFactor = getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR); - if (elasticityFactor < 1 || elasticityFactor > 16) { - throw new ConfigException("Elasticity factor can not be less than 1 or greater than 16"); + if (elasticityFactor < 1) { + throw new ConfigException("Elasticity factor can not be less than 1"); } return elasticityFactor; } diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java index 5bbcbae7c..3334adc74 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java @@ -860,19 +860,8 @@ public class RunLoop implements Runnable, Throttleable { IncomingMessageEnvelope envelope = pendingEnvelope.envelope; if (envelope.isEndOfStream()) { - if (elasticityFactor <= 1) { - SystemStreamPartition ssp = envelope.getSystemStreamPartition(); - processingSspSet.remove(ssp); - } else { - // if envelope is end of stream, the ssp of envelope should be removed from task's processing set irresp of keyBucket - SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition(); - Optional<SystemStreamPartition> ssp = processingSspSet.stream() - .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream()) - && sspInSet.getPartition().equals(sspOfEnvelope.getPartition())) - .findFirst(); - ssp.ifPresent(processingSspSet::remove); - ssp.ifPresent(processingSspSetToDrain::remove); - } + SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor); + processingSspSet.remove(ssp); if (!hasIntermediateStreams) { pendingEnvelopeQueue.remove(); } diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java index 53e740005..1efd17217 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java @@ -64,7 +64,7 @@ public class GroupByPartition implements SystemStreamPartitionGrouper { int keyBucket = elasticityFactor == 1 ? -1 : i; String taskNameStr = elasticityFactor == 1 ? String.format("Partition %d", ssp.getPartition().getPartitionId()) : - String.format("Partition %d_%d_%d", ssp.getPartition().getPartitionId(), keyBucket, elasticityFactor); + String.format("Partition %d %d", ssp.getPartition().getPartitionId(), keyBucket); TaskName taskName = new TaskName(taskNameStr); SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket); groupedMap.putIfAbsent(taskName, new HashSet<>()); diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java index 652cf15cc..944304843 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java @@ -63,8 +63,7 @@ public class GroupBySystemStreamPartition implements SystemStreamPartitionGroupe SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket); HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>(); sspSet.add(sspWithKeyBucket); - String elasticitySuffix = elasticityFactor == 1 ? "" : String.format("_%d", elasticityFactor); - groupedMap.put(new TaskName(sspWithKeyBucket.toString() + elasticitySuffix), sspSet); + groupedMap.put(new TaskName(sspWithKeyBucket.toString()), sspSet); } } diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java deleted file mode 100644 index 90c37480d..000000000 --- a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java +++ /dev/null @@ -1,493 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.elasticity.util; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.samza.checkpoint.Checkpoint; -import org.apache.samza.container.TaskName; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemAdmins; -import org.apache.samza.system.SystemStreamPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Class with util methods to be used for checkpoint computation when elasticity is enabled - * Elasticity is supported only for tasks created by either - * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or - * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper - */ -public class ElasticityUtils { - private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class); - - // GroupByPartition tasks have names like Partition 0_1_2 - // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor - // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT} - private static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)"; - private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); - private static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)"; - private static final Pattern TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX); - private static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition "; - - //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2" - // where 2 is the elasticity factor - // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString} - private static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)"; - private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX); - private static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]"; - private static final Pattern TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX); - private static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition "; - - /** - * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks - * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2 - * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2 - * Both tasks have names ending with _%d where %d is the elasticity factor - * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task - * @return - * for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name - * for other tasks returns 1 which is the default elasticity factor - */ - static int getElasticityFactorFromTaskName(TaskName taskName) { - return getTaskNameParts(taskName).elasticityFactor; - } - - /** - * checks if the given taskname is of a GroupByPartition task - * @param taskName of any task - * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise - */ - static boolean isGroupByPartitionTask(TaskName taskName) { - return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX); - } - - /** - * checks if the given taskname is of a GroupBySystemStreamPartition task - * @param taskName of any task - * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise - */ - static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) { - return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX); - } - - /** - * checks if given taskName is elastic aka created with an elasticity factor > 1 - * @param taskName of any task - * @return true for following, false otherwise - * for task created by GroupByPartition, taskName has format "Partition 0_1_2" - * for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2" - */ - static boolean isTaskNameElastic(TaskName taskName) { - if (isGroupByPartitionTask(taskName)) { - Matcher m = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskName.getTaskName()); - return m.find(); - } else if (isGroupBySystemStreamPartitionTask(taskName)) { - Matcher m = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskName.getTaskName()); - return m.find(); - } - return false; - } - - /** - * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor - * @param taskName any taskName - * @return TaskNameComponents object containing system, stream, partition, keyBucket and elasticityFactor - * for GroupByPartition task: - * taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity) - * system and stream are empty "" strings and partition is the input partition, - * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) - * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) - * for GroupBySystemStreamPartition task: - * taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or - * "SystemStreamPartition [systemA, streamB, 0]" (without elasticity) - * system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above) - * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) - * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) - * for tasks created with other SSP groupers: - * default TaskNameComponents is returned which has empty system, stream, - * -1 for partition and 0 for keyBucket and 1 for elasticity factor - */ - static TaskNameComponents getTaskNameParts(TaskName taskName) { - if (isGroupByPartitionTask(taskName)) { - return getTaskNameParts_GroupByPartition(taskName); - } else if (isGroupBySystemStreamPartitionTask(taskName)) { - return getTaskNameParts_GroupBySSP(taskName); - } - log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. " - + "Elasticity is not supported for this taskName. " - + "Returning default TaskNameComponents which has default keyBucket 0," - + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName()); - return new TaskNameComponents(TaskNameComponents.INVALID_PARTITION); - } - - /** - * see doc for getTaskNameParts above - */ - private static TaskNameComponents getTaskNameParts_GroupByPartition(TaskName taskName) { - String taskNameStr = taskName.getTaskName(); - log.debug("GetTaskNameParts for taskName {}", taskNameStr); - - Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr); - if (matcher.find()) { - return new TaskNameComponents(Integer.valueOf(matcher.group(1)), - Integer.valueOf(matcher.group(2)), - Integer.valueOf(matcher.group(3))); - } - matcher = TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr); - if (matcher.find()) { - return new TaskNameComponents(Integer.valueOf(matcher.group(1))); - } - log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr); - throw new IllegalArgumentException("TaskName format incompatible"); - } - - /** - * see doc for getTaskNameParts above - */ - private static TaskNameComponents getTaskNameParts_GroupBySSP(TaskName taskName) { - String taskNameStr = taskName.getTaskName(); - log.debug("GetTaskNameParts for taskName {}", taskNameStr); - - Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr); - if (matcher.find()) { - return new TaskNameComponents(matcher.group(1), - matcher.group(2), - Integer.valueOf(matcher.group(3)), - Integer.valueOf(matcher.group(4)), - Integer.valueOf(matcher.group(5))); - } - matcher = TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr); - if (matcher.find()) { - return new TaskNameComponents(matcher.group(1), - matcher.group(2), - Integer.valueOf(matcher.group(3))); - } - log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr); - throw new IllegalArgumentException("TaskName format incompatible"); - } - - /** - * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition]. - * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket] - * where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP - * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true - * all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same - * For example: - * case 1: elasticityFactor 2 to 1 - * otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1 - * currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2 - * currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2 - * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2 - * case 2: elasticityFactor 2 to 2 - no change - * Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change - * similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors - * case 3: elasticityFactor 4 to 2 - * otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2 - * currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4 - * currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4 - * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope} - * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor; - * Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01. - * Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4 - * Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4 - * And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4 - * - * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers. - * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition - * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned. - * @param currentTask - * @param otherTask - * @return true if otherTask is ancestor of currentTask, false otherwise - */ - static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) { - log.debug("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask); - if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask)) - || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) { - return false; - } - - TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask); - TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask); - - if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system) - || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream) - || otherTaskNameComponents.partition != currentTaskNameComponents.partition - || otherTaskNameComponents.elasticityFactor > currentTaskNameComponents.elasticityFactor) { - return false; - } - - return (currentTaskNameComponents.keyBucket % otherTaskNameComponents.elasticityFactor) == otherTaskNameComponents.keyBucket; - } - - /** - * See javadoc for isOtherTaskAncestorOfCurrentTask above - * Given currentTask and otherTask, - * if currentTask == otherTask, then its not a descendant. (unlike ancestor) - * else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask - * @param currentTask - * @param otherTask - * @return - */ - static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) { - log.debug("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask); - if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask)) - || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) { - return false; - } - - TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask); - TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask); - - if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system) - || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream) - || otherTaskNameComponents.partition != currentTaskNameComponents.partition - || otherTaskNameComponents.elasticityFactor <= currentTaskNameComponents.elasticityFactor) { - return false; - } - - return ( - otherTaskNameComponents.keyBucket % currentTaskNameComponents.elasticityFactor) == currentTaskNameComponents.keyBucket; - } - - /** - * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints - * All ancestor checkpoints are put into a set - * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant. - * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4) - * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>) - * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant - * @param taskName name of the task - * @param checkpointMap map from taskName to checkpoint - * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map - */ - static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints( - TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) { - Set<Checkpoint> ancestorCheckpoints = new HashSet<>(); - Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>(); - log.debug("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName()); - checkpointMap.keySet().forEach(otherTaskName -> { - Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName); - if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) { - log.debug("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName); - ancestorCheckpoints.add(otherTaskCheckpoint); - } - if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) { - log.debug("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName); - int otherEF = getElasticityFactorFromTaskName(otherTaskName); - if (!descendantCheckpoints.containsKey(otherEF)) { - descendantCheckpoints.put(otherEF, new HashSet<>()); - } - descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint); - } - }); - log.debug("done computing all ancestors and descendants of {}", taskName); - return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints); - } - - /** - * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp - * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. - * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket) - * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint - * @param checkpoint Checkpoint containing SSP -> offset - * @param ssp SystemStreamPartition for which an offset needs to be fetched - * @return offset for the ssp in the Checkpoint or null if doesnt exist. - */ - static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) { - String checkpointStr = checkpoint.getOffsets().entrySet().stream() - .map(k -> k.getKey() + " : " + k.getValue()) - .collect(Collectors.joining(", ", "{", "}")); - log.debug("for ssp {}, in checkpoint {}", ssp, checkpointStr); - - Optional<String> offsetFound = checkpoint.getOffsets().entrySet() - .stream() - .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey() - .getPartition() - .equals(ssp.getPartition())) - .map(Map.Entry::getValue) - .findFirst(); - if (offsetFound.isPresent()) { - return offsetFound.get(); - } - log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint); - return null; - } - - /** - * Given a set of checkpoints, find the max aka largest offset for an ssp - * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system. - * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. - * @param checkpointSet set of checkpoints - * @param ssp for which largest offset is needed - * @param systemAdmin of the ssp.getSystem() - * @return offset - string if one exists else null - */ - static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet, - SystemStreamPartition ssp, SystemAdmin systemAdmin) { - return checkpointSet.stream() - .filter(Objects::nonNull) - .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp)) - .filter(Objects::nonNull) - .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first - .findFirst().orElse(null); - } - - /** - * Given a set of checkpoints, find the min aka smallest offset for an ssp - * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system. - * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. - * @param checkpointSet set of checkpoints - * @param ssp for which largest offset is needed - * @param systemAdmin of the ssp.getSystem() - * @return offset - string if one exists else null - */ - static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet, - SystemStreamPartition ssp, SystemAdmin systemAdmin) { - return checkpointSet.stream() - .filter(Objects::nonNull) - .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp)) - .filter(Objects::nonNull) - .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first - .findFirst().orElse(null); - } - - /** - * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion - * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4 - * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2) - * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4] - * - * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset. - * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset. - * - * With descendants, a little care is needed. there could be descendants with different elasticity factors. - * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task. - * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor. - * Across elasticity factors, largest works just like in ancestor - * - * Taking a concrete example - * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME) - * Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1 - * Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2 - * Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2 - * Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4 - * Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4 - * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope} - * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor; - * Thus, - * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. - * SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01. - * If the checkpoint map has - * Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6) - * looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4 - * to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets. - * picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed. - * hence we need to take min of offsets within an elasticity factor. - * - * Given checkpoints for all the tasks in the checkpoint stream, - * computing the last proc offset for an ssp checkpoint for a task, - * the following needs to be met. - * 1. Ancestors: we need to take largest offset among ancestors for an ssp - * 2. Descendants: - * a. group descendants by their elasticityFactor. - * b. among descendants of the same elasticityFactor, take the smallest offset for an ssp - * c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set - * 3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants) - * - * @param taskName - * @param taskSSPSet - * @param checkpointMap - * @param systemAdmins - * @return - */ - public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap( - TaskName taskName, - Set<SystemStreamPartition> taskSSPSet, - Map<TaskName, Checkpoint> checkpointMap, - SystemAdmins systemAdmins) { - Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound = - getAncestorAndDescendantCheckpoints(taskName, checkpointMap); - Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft(); - Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight(); - - Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>(); - - taskSSPSet.forEach(ssp_withKeyBucket -> { - log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket); - - SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(), - ssp_withKeyBucket.getPartition()); - - SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem()); - - String currentLastOffsetForSSP = null; - - String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin); - - log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}", - taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP); - - String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream() - .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset - .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first - .filter(Objects::nonNull) - .findFirst().orElse(null); - - log.info("for taskName {} and ssp {} got lastoffset from descendants as {}", - taskName, ssp_withKeyBucket, descendantLastOffsetForSSP); - - Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP); - if (descendantLastOffsetForSSP == null || (offsetComparison != null && offsetComparison > 0)) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP - currentLastOffsetForSSP = ancestorLastOffsetForSSP; - } else { - currentLastOffsetForSSP = descendantLastOffsetForSSP; - } - if (currentLastOffsetForSSP == null) { - log.info("for taskName {} and ssp {} got lastoffset as null. " - + "skipping adding this ssp to task's offsets loaded from previous checkpoint", taskName, ssp_withKeyBucket); - } else { - log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP); - taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP); - } - }); - - String checkpointStr = taskSSPOffsets.entrySet().stream() - .map(k -> k.getKey() + " : " + k.getValue()) - .collect(Collectors.joining(", ", "{", "}")); - log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr); - return taskSSPOffsets; - } - - public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) { - return checkpointMap.keySet().stream() - .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it - .findFirst().isPresent(); - } -} diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java deleted file mode 100644 index 4b76b846f..000000000 --- a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.elasticity.util; - -/** - * POJO class to store system, stream, partition, and keyBucket information associated with a Task, - * that is encoded in the task's name. - */ -public class TaskNameComponents { - - public static final int DEFAULT_KEY_BUCKET = 0; - public static final int DEFAULT_ELASTICITY_FACTOR = 1; - public static final int INVALID_PARTITION = -1; - - public final String system; - public final String stream; - public final int partition; - public final int keyBucket; - public final int elasticityFactor; - - public TaskNameComponents(int partition) { - this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR); - } - - public TaskNameComponents(int partition, int keyBucket, int elasticityFactor) { - this("", "", partition, keyBucket, elasticityFactor); - } - - public TaskNameComponents(String system, String stream, int partition) { - this(system, stream, partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR); - } - - public TaskNameComponents(String system, String stream, int partition, int keyBucket, int elasticityFactor) { - this.system = system; - this.stream = stream; - this.partition = partition; - this.keyBucket = keyBucket; - this.elasticityFactor = elasticityFactor; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof TaskNameComponents)) return false; - - TaskNameComponents that = (TaskNameComponents) o; - - if (!(this.system.equals(that.system)) - || !(this.stream.equals(that.stream)) - || (this.partition != that.partition) - || (this.keyBucket != that.keyBucket) - || (this.elasticityFactor != that.elasticityFactor)) { - return false; - } - return true; - } - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + system.hashCode(); - result = prime * result + stream.hashCode(); - result = prime * result + partition; - result = prime * result + keyBucket; - result = prime * result + elasticityFactor; - return result; - } -} diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java index 3470fd855..e04edeef5 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java @@ -231,9 +231,10 @@ public class SamzaObjectMapper { public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { String[] parts = sspString.split("\\."); if (parts.length < 3) { - throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "); + throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition"); } - return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); + return new SystemStreamPartition( + new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); } } diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 311dc6a56..48b681ea4 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -22,13 +22,12 @@ package org.apache.samza.checkpoint import java.util import java.util.HashMap import java.util.concurrent.ConcurrentHashMap + import org.apache.commons.lang3.StringUtils import org.apache.samza.SamzaException import org.apache.samza.annotation.InterfaceStability -import org.apache.samza.checkpoint.OffsetManager.info -import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig} +import org.apache.samza.config.{Config, StreamConfig, SystemConfig} import org.apache.samza.container.TaskName -import org.apache.samza.elasticity.util.ElasticityUtils import org.apache.samza.startpoint.{Startpoint, StartpointManager} import org.apache.samza.system.SystemStreamMetadata.OffsetType import org.apache.samza.system._ @@ -106,9 +105,7 @@ object OffsetManager extends Logging { // Build OffsetSetting so we can create a map for OffsetManager. (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset)) }.toMap - - new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, - offsetManagerMetrics) + new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics) } } @@ -215,26 +212,16 @@ class OffsetManager( * Set the last processed offset for a given SystemStreamPartition. */ def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) { - // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName - // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes - // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName - // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets - val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName, - throw new SamzaException("No SSPs registered for task: " + taskName)) - .filter(ssp => ssp.getSystemStream.equals(systemStreamPartition.getSystemStream) - && ssp.getPartition.equals(systemStreamPartition.getPartition)) - .toIterator.next() - lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]()) taskSSPsWithProcessedOffsetUpdated.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, Boolean]()) if (offset != null) { if (!offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { - lastProcessedOffsets.get(taskName).put(sspWithKeyBucket, offset) + lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset) } // Record the spp that have received the new messages. The startpoint for each ssp should only be deleted when the // ssp has received the new messages. More details in SAMZA-2749. - taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(sspWithKeyBucket, true) + taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(systemStreamPartition, true) } } @@ -500,19 +487,12 @@ class OffsetManager( val checkpoint = checkpointManager.readLastCheckpoint(taskName) - val checkpointMap = checkpointManager.readAllCheckpoints() - if (!ElasticityUtils.wasElasticityEnabled(checkpointMap)) { - if (checkpoint != null) { - return Map(taskName -> checkpoint.getOffsets.asScala.toMap) - } else { - info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) - return Map(taskName -> Map()) - } + if (checkpoint != null) { + Map(taskName -> checkpoint.getOffsets.asScala.toMap) + } else { + info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) + Map(taskName -> Map()) } - info("There was elasticity enabled in one of the previous deploys." + - "Last processed offsets computation at container start will use elasticity checkpoints if available.") - Map(taskName -> ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName, - systemStreamPartitions.get(taskName).get.asJava, checkpointMap, systemAdmins).asScala) } /** diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 606f32ace..75f4a8d8f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -226,7 +226,7 @@ class TaskInstance( // if elasticity is enabled aka elasticity factor > 1 // then this TaskInstance processes only those envelopes whose key falls // within the keyBucket of the SSP assigned to the task. - val incomingMessageSsp = getIncomingMessageSSP(envelope) + val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor) if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp, throw new SamzaException(incomingMessageSsp + " is not registered!"))) { @@ -566,7 +566,7 @@ class TaskInstance( // if elasticity is enabled aka elasticity factor > 1 // then this TaskInstance handles only those envelopes whose key falls // within the keyBucket of the SSP assigned to the task. - var incomingMessageSsp = getIncomingMessageSSP(envelope) + val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor) if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) { ssp2CaughtupMapping(incomingMessageSsp) = true @@ -629,28 +629,4 @@ class TaskInstance( startingOffset } - - private def getIncomingMessageSSP(envelope: IncomingMessageEnvelope): SystemStreamPartition = { - if (elasticityFactor <= 1) { - return envelope.getSystemStreamPartition - } - // if elasticityFactor > 1, find the SSP with keyBucket - var incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor) - - // if envelope is end of stream or watermark or drain, - // it needs to be routed to all tasks consuming the ssp irresp of keyBucket - val messageType = MessageType.of(envelope.getMessage) - if (envelope.isEndOfStream() - || envelope.isDrain() - || messageType == MessageType.END_OF_STREAM - || messageType == MessageType.WATERMARK) { - - incomingMessageSsp = systemStreamPartitions - .filter(ssp => ssp.getSystemStream.equals(incomingMessageSsp.getSystemStream) - && ssp.getPartition.equals(incomingMessageSsp.getPartition)) - .toIterator.next() - debug("for watermark or end-of-stream or drain envelope, found incoming ssp as {}".format(incomingMessageSsp)) - } - incomingMessageSsp - } } diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala index e69a97300..ba4226484 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala @@ -40,7 +40,7 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with Logging { // Serialize checkpoint as maps keyed by the SSP.toString() to the another map of the constituent SSP components // and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids // having to wrap it another class while maintaining readability. - // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "keyBucket": keyBucket, "offset": offset)} + // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "offset": offset)} def fromBytes(bytes: Array[Byte]): CheckpointV1 = { try { val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]]) @@ -55,6 +55,7 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with Logging { require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition") val offset = sspInfo.get("offset") // allow null offsets, e.g. for changelog ssps + new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset } diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java index f56af2937..a7458db80 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java @@ -707,16 +707,6 @@ public class TestJobConfig { } assertTrue(exceptionCaught); - jobConfig = - new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(17)))); - exceptionCaught = false; - try { - jobConfig.getElasticityFactor(); - } catch (ConfigException e) { - exceptionCaught = true; - } - assertTrue(exceptionCaught); - jobConfig = new JobConfig(new MapConfig()); assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor()); } diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java index e58497483..16ef93de1 100644 --- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java @@ -224,39 +224,6 @@ public class TestRunLoop { assertEquals(1, containerMetrics.processes().getCount()); // only envelope00 and not envelope01 and not end of stream } - @Test - public void testEndOfStreamElasticityEnabled() { - - TaskName taskName0 = new TaskName(p0.toString() + " 0"); - TaskName taskName1 = new TaskName(p0.toString() + " 1"); - SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStreamA", p0); - SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStreamA", p0, 0); - SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStreamA", p0, 1); - - // create EOS IME such that its ssp keybucket maps to ssp0 and not to ssp1 - // task in the runloop should give this ime to both it tasks - IncomingMessageEnvelope envelopeEOS = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp)); - when(envelopeEOS.getSystemStreamPartition(2)).thenReturn(ssp0); - - - // two task in the run loop that processes ssp0 -> 0th keybucket of ssp and ssp1 -> 1st keybucket of ssp - // EOS ime should be given to both the tasks irresp of the keybucket - RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0); - RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1); - - SystemConsumers consumerMultiplexer = mock(SystemConsumers.class); - when(consumerMultiplexer.choose(false)).thenReturn(envelopeEOS).thenReturn(null); - - Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0, taskName1, task1); - int maxMessagesInFlight = 1; - RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2, null); - runLoop.run(); - - verify(task0).endOfStream(any()); - verify(task1).endOfStream(any()); - } - @Test public void testDrainWithElasticityEnabled() { TaskName taskName0 = new TaskName(p0.toString() + " 0"); diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java index a82a9a181..e99e3f237 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java @@ -93,12 +93,12 @@ public class TestGroupByPartition { GroupByPartition grouper = new GroupByPartition(config); Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ab1, ab2, ac0)); Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() - .put(new TaskName("Partition 0_0_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0))) - .put(new TaskName("Partition 0_1_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1))) - .put(new TaskName("Partition 1_0_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0))) - .put(new TaskName("Partition 1_1_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1))) - .put(new TaskName("Partition 2_0_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0))) - .put(new TaskName("Partition 2_1_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1))) + .put(new TaskName("Partition 0 0"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0))) + .put(new TaskName("Partition 0 1"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1))) + .put(new TaskName("Partition 1 0"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0))) + .put(new TaskName("Partition 1 1"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1))) + .put(new TaskName("Partition 2 0"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0))) + .put(new TaskName("Partition 2 1"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1))) .build(); assertEquals(expectedResult, result); diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java index 90628b307..13b7678c7 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java @@ -84,22 +84,14 @@ public class TestGroupBySystemStreamPartition { Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0)); Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() - .put(new TaskName(new SystemStreamPartition(aa0, 0).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa0, 0))) - .put(new TaskName(new SystemStreamPartition(aa0, 1).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa0, 1))) - .put(new TaskName(new SystemStreamPartition(aa1, 0).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa1, 0))) - .put(new TaskName(new SystemStreamPartition(aa1, 1).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa1, 1))) - .put(new TaskName(new SystemStreamPartition(aa2, 0).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa2, 0))) - .put(new TaskName(new SystemStreamPartition(aa2, 1).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(aa2, 1))) - .put(new TaskName(new SystemStreamPartition(ac0, 0).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(ac0, 0))) - .put(new TaskName(new SystemStreamPartition(ac0, 1).toString() + "_2"), - ImmutableSet.of(new SystemStreamPartition(ac0, 1))) + .put(new TaskName(new SystemStreamPartition(aa0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 0))) + .put(new TaskName(new SystemStreamPartition(aa0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 1))) + .put(new TaskName(new SystemStreamPartition(aa1, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 0))) + .put(new TaskName(new SystemStreamPartition(aa1, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 1))) + .put(new TaskName(new SystemStreamPartition(aa2, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 0))) + .put(new TaskName(new SystemStreamPartition(aa2, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 1))) + .put(new TaskName(new SystemStreamPartition(ac0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 0))) + .put(new TaskName(new SystemStreamPartition(ac0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 1))) .build(); assertEquals(expectedResult, result); diff --git a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java b/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java deleted file mode 100644 index dc6bfbb93..000000000 --- a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.elasticity.util; - -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.samza.Partition; -import org.apache.samza.checkpoint.Checkpoint; -import org.apache.samza.checkpoint.CheckpointId; -import org.apache.samza.checkpoint.CheckpointV2; -import org.apache.samza.container.TaskName; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemAdmins; -import org.apache.samza.system.SystemStreamPartition; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - - -// #TODO: going to make this entire class parametrized. -public class TestElasticityUtils { - private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0"); - private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2"); - private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]"); - private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2"); - - @Test - public void testComputeLastProcessedOffsetsFromCheckpointMap() { - // Setup : - // there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job - // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2. - // Before elasticity, job has one task with name "Partition 0" - // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2" - // Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)] - // Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)] - // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4" - // Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)] - // Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)] - // Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)] - // Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)] - - // - // From the definition of keyBucket computation using elasticity factor in - // {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as - // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor - // messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2 - // messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2 - // messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself - - TaskName taskName = new TaskName("Partition 0_0_2"); - Map<TaskName, Checkpoint> checkpointMap = new HashMap<>(); - SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); - SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0); - SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2); - - - SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class); - // offsets ordering 1 < 2 < 3 < 4 - Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1); - Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1); - Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1); - Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1); - Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1); - Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1); - - SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class); - Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin); - - // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself. - // hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also - checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1")); - checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4")); - checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2")); - checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3")); - Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertEquals("4", result.get(ssp0)); - - // case 2: for task Partition 0_0_2: last deploy was with ef =1 - // hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset - checkpointMap = new HashMap<>(); - checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4")); - checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1")); - checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3")); - checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2")); - - - result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertEquals("4", result.get(ssp0)); - - - // case 3: for task partition 0_0_2: last deploy was with ef = 4 - // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant. - // since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets - - checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1")); - checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2")); - checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3")); - checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4")); - result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertEquals("3", result.get(ssp0)); - } - @Test - public void testComputeLastProcessedOffsetsWithEdgeCases() { - TaskName taskName = new TaskName("Partition 0_0_2"); - Map<TaskName, Checkpoint> checkpointMap = new HashMap<>(); - SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0); - - SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class); - SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class); - Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin); - - // case 1: empty checkpoint map - Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertTrue("if given checkpoint map is empty, return empty last processed offsets map", result.isEmpty()); - - // case 2: null checkpoints given for some ancestor tasks - checkpointMap.put(new TaskName("Partition 0"), null); - result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty()); - - // case 3: null checkpoints given for some descendant tasks - checkpointMap.put(new TaskName("Partition 0_0_4"), null); - result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( - taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); - Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty()); - } - - @Test - public void testTaskIsGroupByPartitionOrGroupBySSP() { - String msgPartition = "GroupByPartition task should start with Partition"; - String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition"; - - Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION)); - Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION)); - - Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION)); - Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask( - ELASTIC_TASKNAME_GROUP_BY_PARTITION)); - - Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP)); - Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP)); - - Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP)); - Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP)); - - TaskName taskName = new TaskName("FooBar"); - Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName)); - Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName)); - } - - @Test - public void testIsTaskNameElastic() { - Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP)); - Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP)); - Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION)); - Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION)); - } - - @Test - public void testGetElasticTaskNameParts() { - TaskNameComponents taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION); - Assert.assertEquals(taskNameComponents.partition, 0); - Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET); - Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR); - - taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION); - Assert.assertEquals(taskNameComponents.partition, 0); - Assert.assertEquals(taskNameComponents.keyBucket, 1); - Assert.assertEquals(taskNameComponents.elasticityFactor, 2); - - taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP); - Assert.assertEquals(taskNameComponents.system, "systemA"); - Assert.assertEquals(taskNameComponents.stream, "streamB"); - Assert.assertEquals(taskNameComponents.partition, 0); - Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET); - Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR); - - taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP); - Assert.assertEquals(taskNameComponents.system, "systemA"); - Assert.assertEquals(taskNameComponents.stream, "streamB"); - Assert.assertEquals(taskNameComponents.partition, 0); - Assert.assertEquals(taskNameComponents.keyBucket, 1); - Assert.assertEquals(taskNameComponents.elasticityFactor, 2); - - taskNameComponents = ElasticityUtils.getTaskNameParts(new TaskName("FooBar")); - Assert.assertEquals(taskNameComponents.partition, TaskNameComponents.INVALID_PARTITION); - } - - @Test - public void testIsOtherTaskAncestorDescendantOfCurrentTask() { - TaskName task0 = new TaskName("Partition 0"); - TaskName task1 = new TaskName("Partition 1"); - TaskName task002 = new TaskName("Partition 0_0_2"); - TaskName task012 = new TaskName("Partition 0_1_2"); - TaskName task004 = new TaskName("Partition 0_0_4"); - TaskName task014 = new TaskName("Partition 0_1_4"); - TaskName task024 = new TaskName("Partition 0_2_4"); - TaskName task034 = new TaskName("Partition 0_3_4"); - - TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]"); - TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2"); - TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2"); - TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4"); - TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4"); - TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4"); - TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4"); - - // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself - // and all these tasks are descendants of Partition 0 (except itself) - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0)); - Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0)); - - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034)); - - // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself - // these tasks are descendants of 0_0_2 - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002)); - - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024)); - - // "SystemStreamPartition [systemA, streamB, 0] - // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself - // and all these tasks are descendants of Partition 0 (except itself) - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0)); - - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034)); - - // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of - // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself - // similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2 - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002)); - Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002)); - - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004)); - Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024)); - } - - @Test - public void testGetAncestorAndDescendantCheckpoints() { - TaskName taskName = new TaskName("Partition 0_0_2"); - Map<TaskName, Checkpoint> checkpointMap = new HashMap<>(); - SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); - Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1"); - Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2"); - Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3"); - Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4"); - Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5"); - Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2)); - Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2)); - - checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1); - checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2); - checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1); - checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2); - checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint); - - Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result = - ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap); - Set<Checkpoint> anscestorCheckpointSet = result.getLeft(); - Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4); - - Assert.assertTrue("should contain all ancestors' checkpoints", - anscestorCheckpointSet.containsAll(ansCheckpointSet)); - Assert.assertFalse("should not contain a descendant checkpoint in anscetor list", - anscestorCheckpointSet.contains(desCheckpoint1)); - Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list", - anscestorCheckpointSet.contains(unrelCheckpoint)); - - Assert.assertTrue("should contain all descendants' checkpoints", - descendantCheckpointSetForEf4.containsAll(desCheckpointSet)); - Assert.assertFalse("should not contain a anscetor checkpoint in descendant list", - descendantCheckpointSetForEf4.contains(ansCheckpoint1)); - Assert.assertFalse("should not contain an unrelated checkpoint in descendant list", - descendantCheckpointSetForEf4.contains(unrelCheckpoint)); - } - - @Test - public void testGetOffsetForSSPInCheckpoint() { - String offset1 = "1111"; - String offset2 = "2222"; - // case 1: when looking for exact ssp - SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); - Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1); - Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1); - - // case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket) - SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1); - checkpoint1 = buildCheckpointV2(sspWithKB, offset2); - Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2); - - // case 3: try getting offset for an ssp not present in the checkpoint -> should return null - SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1)); - Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null); - } - - @Test - public void testGetMaxMinOffsetForSSPInCheckpointSet() { - String offset1 = "1111"; - String offset2 = "2222"; - - SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); - Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1); - Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2); - Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2)); - - SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class); - // offset 1 < offset2 - Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1); - Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1); - - // case 1: when exact ssp is in checkpoint set - Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin)); - Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin)); - - // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket) - SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1); - Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin)); - Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin)); - - - // case 3: when ssp not in checkpoint set -> should receive null for min and max offset - SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0)); - Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin)); - Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin)); - } - - @Test - public void testWasElasticityEnabled() { - Checkpoint checkpoint1 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0)), "1"); - Checkpoint checkpoint2 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(1)), "2"); - Checkpoint checkpoint3 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 0), "3"); - Checkpoint checkpoint4 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 1), "4"); - - // case 0: empty checkpoint map - Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(new HashMap<>())); - - // case 1: no tasks with elasticity enabled in the checkpoint map - Map<TaskName, Checkpoint> checkpointMap1 = new HashMap<>(); - checkpointMap1.put(new TaskName("Partition 0"), checkpoint1); - checkpointMap1.put(new TaskName("Partition 2"), checkpoint2); - Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap1)); - - // case 2: tasks with no elasticity and tasks with elasticity both present in the checkpoint map - Map<TaskName, Checkpoint> checkpointMap2 = new HashMap<>(); - checkpointMap2.put(new TaskName("Partition 0"), checkpoint1); - checkpointMap2.put(new TaskName("Partition 2"), checkpoint2); - checkpointMap2.put(new TaskName("Partition 0_0_2"), checkpoint3); - Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap2)); - - // case 3: only tasks with elasticity present in the checkpoint map - Map<TaskName, Checkpoint> checkpointMap3 = new HashMap<>(); - checkpointMap3.put(new TaskName("Partition 0_0_2"), checkpoint3); - checkpointMap3.put(new TaskName("Partition 0_1_2"), checkpoint4); - Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap3)); - - // case 4: repeat same checks with GroupBySSP grouper tasks - Map<TaskName, Checkpoint> checkpointMap4 = new HashMap<>(); - checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0]"), checkpoint1); - checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 1]"), checkpoint2); - Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap4)); - checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 0]_2"), checkpoint3); - checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 1]_2"), checkpoint4); - Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap4)); - - // case 5: repeat same checks with AllSspToSingleTask grouper tasks - no elasticity supported for this grouper - Map<TaskName, Checkpoint> checkpointMap5 = new HashMap<>(); - checkpointMap5.put(new TaskName("Task-0"), checkpoint1); - checkpointMap5.put(new TaskName("Task-1"), checkpoint2); - Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap5)); - } - - private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) { - return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset), - ImmutableMap.of("backend", ImmutableMap.of("store", "10"))); - } -} diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java index 65832406b..2ddecf07c 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java @@ -340,7 +340,7 @@ public class TestSamzaObjectMapper { assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition")); assertEquals(serializedSSPAsJson.get("keyBucket"), deserSSPAsJson.get("-1")); - //Scenario 2: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper + //Scenario 1: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition("foo", "bar", new Partition(1), 1); serializedString = elasticObjectMapper.writeValueAsString(sspWithKeyBucket); @@ -356,37 +356,6 @@ public class TestSamzaObjectMapper { assertEquals(serializedSSPAsJson.get("system"), deserSSPAsJson.get("system")); assertEquals(serializedSSPAsJson.get("stream"), deserSSPAsJson.get("stream")); assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition")); - - // scenario 3: ssp as key serialized with preElasticMapper and deserialized by new Mapper with elasticity - Map<SystemStreamPartition, String> offsets = new HashMap<>(); - String offset = "100"; - - String sspmapString = preElasticObjectMapper.writeValueAsString(ImmutableMap.of(ssp, offset)); - - TypeReference<HashMap<SystemStreamPartition, String>> typeRef - = new TypeReference<HashMap<SystemStreamPartition, String>>() { }; - - Map<SystemStreamPartition, String> deserSSPMap = elasticObjectMapper.readValue(sspmapString, typeRef); - SystemStreamPartition deserSSP = deserSSPMap.keySet().stream().findAny().get(); - String deserOffset = deserSSPMap.values().stream().findFirst().get(); - assertEquals(ssp.getSystem(), deserSSP.getSystem()); - assertEquals(ssp.getStream(), deserSSP.getStream()); - assertEquals(ssp.getPartition(), deserSSP.getPartition()); - assertEquals(ssp.getKeyBucket(), deserSSP.getKeyBucket()); - assertEquals(offset, deserOffset); - - // Scenario 4: ssp key serialized with new elasticMapper and deserialized by old preElastic Mapper - sspmapString = elasticObjectMapper.writeValueAsString(ImmutableMap.of(sspWithKeyBucket, offset)); - - deserSSPMap = preElasticObjectMapper.readValue(sspmapString, typeRef); - deserSSP = deserSSPMap.keySet().stream().findAny().get(); - deserOffset = deserSSPMap.values().stream().findFirst().get(); - assertEquals(sspWithKeyBucket.getSystem(), deserSSP.getSystem()); - assertEquals(sspWithKeyBucket.getStream(), deserSSP.getStream()); - assertEquals(sspWithKeyBucket.getPartition(), deserSSP.getPartition()); - // preElastic mapper does not know about KeyBucket so dont check for it - assertEquals(offset, deserOffset); - } private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException { @@ -440,12 +409,14 @@ public class TestSamzaObjectMapper { private static class OldSystemStreamPartitionKeyDeserializer extends KeyDeserializer { @Override public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { - String[] parts = sspString.split("\\."); - if (parts.length < 3 || parts.length > 4) { - throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' " - + "or 'system.stream.partition.keyBucket"); + int idx = sspString.indexOf('.'); + int lastIdx = sspString.lastIndexOf('.'); + if (idx < 0 || lastIdx < 0) { + throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition"); } - return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); + return new SystemStreamPartition( + new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)), + new Partition(Integer.parseInt(sspString.substring(lastIdx + 1)))); } } public static ObjectMapper getOldDeserForSSpKeyObjectMapper() { @@ -483,27 +454,6 @@ public class TestSamzaObjectMapper { } } - private static class PreElasticitySystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> { - @Override - public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException { - String sspString = ssp.getSystem() + "." + ssp.getStream() + "." - + String.valueOf(ssp.getPartition().getPartitionId()); - jgen.writeFieldName(sspString); - } - } - - private static class PreElasticitySystemStreamPartitionKeyDeserializer extends KeyDeserializer { - @Override - public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { - String[] parts = sspString.split("\\."); - if (parts.length < 3) { - throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition"); - } - return new SystemStreamPartition( - new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); - } - } - private static final ObjectMapper OBJECT_MAPPER = getPreEleasticObjectMapper(); public static ObjectMapper getPreEleasticObjectMapper() { ObjectMapper mapper = new ObjectMapper(); @@ -514,13 +464,13 @@ public class TestSamzaObjectMapper { // Setup custom serdes for simple data types. module.addSerializer(Partition.class, new SamzaObjectMapper.PartitionSerializer()); module.addSerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionSerializer()); - module.addKeySerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeySerializer()); + module.addKeySerializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeySerializer()); module.addSerializer(TaskName.class, new SamzaObjectMapper.TaskNameSerializer()); module.addSerializer(TaskMode.class, new SamzaObjectMapper.TaskModeSerializer()); module.addDeserializer(TaskName.class, new SamzaObjectMapper.TaskNameDeserializer()); module.addDeserializer(Partition.class, new SamzaObjectMapper.PartitionDeserializer()); module.addDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionDeserializer()); - module.addKeyDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeyDeserializer()); + module.addKeyDeserializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeyDeserializer()); module.addDeserializer(Config.class, new SamzaObjectMapper.ConfigDeserializer()); module.addDeserializer(TaskMode.class, new SamzaObjectMapper.TaskModeDeserializer()); module.addSerializer(CheckpointId.class, new SamzaObjectMapper.CheckpointIdSerializer()); diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index d0039a940..c2d24a988 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -584,47 +584,6 @@ class TestOffsetManager { assertEquals("60", modifiedOffsets.get(ssp6)) } - @Test - def testElasticityUpdateWithoutKeyBucket: Unit = { - // When elasticity is enabled, task consumes a part of the full SSP represented by SSP With KeyBucket. - // OffsetManager tracks the set of SSP with KeyBucket consumed by a task. - // However, after an IME processing is complete, OffsetManager.update is called without KeyBuket. - // OffsetManager has to find and udpate the last processed offset for the task correctly for its SSP with KeyBucket. - val taskName = new TaskName("c") - val systemStream = new SystemStream("test-system", "test-stream") - val partition = new Partition(0) - val systemStreamPartition = new SystemStreamPartition(systemStream, partition) - val systemStreamPartitionWithKeyBucket = new SystemStreamPartition(systemStreamPartition, 0); - val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava) - val systemStreamMetadata = Map(systemStream -> testStreamMetadata) - val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) - val startpointManagerUtil = getStartpointManagerUtil() - val systemAdmins = mock(classOf[SystemAdmins]) - when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin) - val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics) - // register task and its input SSP with KeyBucket - offsetManager.register(taskName, Set(systemStreamPartitionWithKeyBucket)) - - offsetManager.start - - // update is called with only the full SSP and no keyBucket information. - offsetManager.update(taskName, systemStreamPartition, "46") - // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit() - val checkpoint46 = offsetManager.getLastProcessedOffsets(taskName) - offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint - offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint46)) - // OffsetManager correctly updates the lastProcessedOffset and checkpoint for the task and input SSP with KeyBucket. - assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket)) - assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue) - - // Now write the checkpoint for the latest offset - val checkpoint47 = offsetManager.getLastProcessedOffsets(taskName) - offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint47)) - - assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket)) - assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue) - } - @Test def testStartpointUpdate: Unit = { val taskName = new TaskName("c") diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 77b1ee35c..08b082ca2 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -162,7 +162,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { verify(processesCounter).inc() verify(messagesActuallyProcessedCounter).inc() - // case 2: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket + // case 1: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket // taskInstance.process should throw the exception ssp is not registered. when(envelope.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket) val thrown = intercept[Exception] { @@ -171,26 +171,6 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { assert(thrown.isInstanceOf[SamzaException]) assert(thrown.getMessage.contains(notProcessedSSPKeyBucket.toString)) assert(thrown.getMessage.contains("is not registered!")) - - // case 3: taskInstance processes the keyBucket=0 of the ssp and envelope is watermark NOT from same keyBucket - // regular processing should happen as Watermark and end of stream should be processed by all tasks - val watermark = spy(IncomingMessageEnvelope.buildWatermarkEnvelope(SYSTEM_STREAM_PARTITION, 1234l)) - when(watermark.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket) - this.taskInstance.process(watermark, coordinator, callbackFactory) - assertEquals(2, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3 - verify(this.task).processAsync(watermark, this.collector, coordinator, callback) - verify(processesCounter, times(3)).inc() // case 1, 2 and 3 - verify(messagesActuallyProcessedCounter, times(2)).inc() // case 1 and 3 - - // case 4: taskInstance processes the keyBucket=0 of the ssp and envelope is EndOfStream NOT from same keyBucket - // regular processing should happen as Watermark and end of stream should be processed by all tasks - val endOfStream = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SYSTEM_STREAM_PARTITION)) - when(endOfStream.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket) - this.taskInstance.process(endOfStream, coordinator, callbackFactory) - assertEquals(3, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3 and case 4 - verify(this.task).processAsync(endOfStream, this.collector, coordinator, callback) - verify(processesCounter, times(4)).inc() // case 1, 2, 3 and 4 - verify(messagesActuallyProcessedCounter, times(3)).inc() // case 1 and 3 and 4 } @Test diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala index bb897983c..02f2e5947 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala @@ -19,11 +19,10 @@ package org.apache.samza.serializers -import com.fasterxml.jackson.databind.ObjectMapper - import java.util + import org.apache.samza.Partition -import org.apache.samza.checkpoint.CheckpointV1 +import org.apache.samza.checkpoint.{CheckpointV1} import org.apache.samza.container.TaskName import org.apache.samza.system.SystemStreamPartition import org.junit.Assert._ @@ -50,81 +49,4 @@ class TestCheckpointV1Serde { val checkpoint = checkpointSerde.fromBytes(checkpointBytes) assertNull(checkpoint) } - - @Test - def testSSPWithKeyBucket { - // case 1: write and read with serde that is aware of KeyBucket within SSP - val serde = new CheckpointV1Serde - var offsets = Map[SystemStreamPartition, String]() - val ssp = new SystemStreamPartition("test-system", "test-stream", - new Partition(777), -1) - offsets += ssp -> "1" - val deserializedOffsets = serde.fromBytes(serde.toBytes(new CheckpointV1(offsets.asJava))) - assertEquals("1", deserializedOffsets.getOffsets.get(ssp)) - assertEquals(1, deserializedOffsets.getOffsets.size) - - // case 2: SSP was serialized by serde not aware of KeyBucket - aka did not put keyBucket into serialized form - val deserializedOffsets2 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets.asJava))) - assertEquals("1", deserializedOffsets2.getOffsets.get(ssp)) - assertEquals(1, deserializedOffsets2.getOffsets.size) - - // case 3: SSP was serialized by serde aware of KeyBucket but deserialized by serde not aware of KeyBucket - val deserializedOffsets3 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets.asJava))) - assertEquals("1", deserializedOffsets3.getOffsets.get(ssp)) - assertEquals(1, deserializedOffsets3.getOffsets.size) - - // case 4: SSP has keyBucket = 0 (aka not default -1) - serialize by serde NOT aware of keyBucket - // when serialized with serde not aware of keyBucket, the info about keyBucket is lost, - // we can only recover the system, stream and partition parts out during deserialization. - // hence after deser, we need to look for SSP without key bucket - val sspWithKeyBucket = new SystemStreamPartition("test-system", "test-stream", - new Partition(777), 0) - val sspWithoutKeyBucket = new SystemStreamPartition("test-system", "test-stream", - new Partition(777)) - var offsets1 = Map[SystemStreamPartition, String]() - offsets1 += sspWithKeyBucket -> "1" - - val deserializedOffsets4 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets1.asJava))) - assertEquals("1", deserializedOffsets4.getOffsets.get(sspWithoutKeyBucket)) - assertEquals(1, deserializedOffsets4.getOffsets.size) - - // case 5: SSP has KeyBucket = 0, serialized by serde aware of keyBucket - val deserializedOffsets5 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets1.asJava))) - assertEquals("1", deserializedOffsets5.getOffsets.get(sspWithoutKeyBucket)) - assertEquals(1, deserializedOffsets5.getOffsets.size) - } - - private def fromBytesWithoutKeyBucket(bytes: Array[Byte]): CheckpointV1 = { - val jsonMapper = new ObjectMapper() - val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]]) - - def deserializeJSONMap(sspInfo:util.HashMap[String, String]) = { - val system = sspInfo.get("system") - val stream = sspInfo.get("stream") - val partition = sspInfo.get("partition") - val offset = sspInfo.get("offset") - new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset - } - val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap - new CheckpointV1(cpMap.asJava) - } - - private def toBytesWithoutKeyBucket(checkpoint: CheckpointV1): Array[Byte] = { - val jsonMapper = new ObjectMapper() - val offsets = checkpoint.getOffsets - val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size()) - - offsets.asScala.foreach { - case (ssp, offset) => - val jMap = new util.HashMap[String, String](4) - jMap.put("system", ssp.getSystemStream.getSystem) - jMap.put("stream", ssp.getSystemStream.getStream) - jMap.put("partition", ssp.getPartition.getPartitionId.toString) - jMap.put("offset", offset) - - asMap.put(ssp.toString, jMap) - } - - jsonMapper.writeValueAsBytes(asMap) - } } diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index cc97a7019..7793b562e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -142,7 +142,16 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, info(s"Reading checkpoint for taskName $taskName") - populateTaskNamesToCheckpointsMap() + if (taskNamesToCheckpoints == null) { + info("Reading checkpoints for the first time") + taskNamesToCheckpoints = readCheckpoints() + if (stopConsumerAfterFirstRead) { + info("Stopping system consumer") + systemConsumer.stop() + } + } else if (!stopConsumerAfterFirstRead) { + taskNamesToCheckpoints ++= readCheckpoints() + } val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, null) @@ -150,14 +159,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, checkpoint } - /** - * @inheritdoc - */ - override def readAllCheckpoints(): util.Map[TaskName, Checkpoint] = { - populateTaskNamesToCheckpointsMap() - scala.collection.JavaConverters.mapAsJavaMapConverter(taskNamesToCheckpoints).asJava - } - /** * @inheritdoc */ @@ -409,17 +410,4 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, throw new IllegalArgumentException("Unknown checkpoint key type: " + checkpointKey.getType) } } - - private def populateTaskNamesToCheckpointsMap() = { - if (taskNamesToCheckpoints == null) { - info("Reading checkpoints for the first time") - taskNamesToCheckpoints = readCheckpoints() - if (stopConsumerAfterFirstRead) { - info("Stopping system consumer") - systemConsumer.stop() - } - } else if (!stopConsumerAfterFirstRead) { - taskNamesToCheckpoints ++= readCheckpoints() - } - } } diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java index d0e4586a0..fe9bfb192 100644 --- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java +++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java @@ -385,25 +385,6 @@ public class TestKafkaCheckpointManager { SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES); } - @Test - public void testReadAllCheckpoints() throws InterruptedException { - Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "1,2")); - setupSystemFactory(config); - CheckpointV2 checkpointV2ForTask0 = buildCheckpointV2(INPUT_SSP0, "0"); - CheckpointV2 checkpointV2ForTask1 = buildCheckpointV2(INPUT_SSP0, "1"); - List<IncomingMessageEnvelope> checkpointEnvelopes = - ImmutableList.of( - newCheckpointV2Envelope(TASK0, checkpointV2ForTask0, "0"), - newCheckpointV2Envelope(TASK1, checkpointV2ForTask1, "1") - ); - setupConsumer(checkpointEnvelopes); - Map<TaskName, Checkpoint> checkpointMap = ImmutableMap.of(TASK0, checkpointV2ForTask0, TASK1, checkpointV2ForTask1); - KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config); - kafkaCheckpointManager.register(TASK0); - Map<TaskName, Checkpoint> readCheckpoints = kafkaCheckpointManager.readAllCheckpoints(); - assertEquals(checkpointMap, readCheckpoints); - } - @Test public void testWriteCheckpointV1() { setupSystemFactory(config());