lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865109887


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when 
elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP 
grouper or
+ * the {@link 
org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP 
grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the 
elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition 
(\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, 
<Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link 
SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = 
"SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition 
\\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and 
GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 
0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names 
SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or 
GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns 
elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or 
false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return 
taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix 
"SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor 
> 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 
0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format 
"SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {

Review Comment:
   yes it can be done. but code wise will look similar and additionally caller 
has to have knowledge of -1 meaning not elastic task.
   that said, i am looking to get rid of this method. the latest commit to this 
pr actually doesnt use this method.
   



##########
samza-core/src/main/java/org/apache/samza/config/JobConfig.java:
##########
@@ -479,6 +484,10 @@ public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, 
CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, 
DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);

Review Comment:
   so the reason for introducing this new config is the following scenario
   
   1. First, job has no elasticity aka factor = 1 by default and checkpoints 
are written with taskName like "Partition 0"
   2. Next, job enabled elasticity aka factor > 1 and checkpoints are written 
with taskName like "Parittion 0_0_2" and not taskname 0
   3. Then after a while, job disables elasticity aka factor =1 again then we 
want to use checkpoints 0_0_2 to compute checkpoint of task with name 
"Partition 0". 
   
   This config when true lets us know that we need to looking for checkpoints 
of other tasks even if the taksName does not match exactly. as in look for 
0_0_2 and 0_1_2 to compute checkpoint of task 0.
   
   without this config, we would have to infer that there was elasticity 
enabled in the past by looking at all checkpoints present in the stream. And 
this flow will apply to jobs even when elasticity was never enabled for the 
job. I wanted to avoid changes to the existing flow and guard all new changes 
behind a config and hence this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to