lakshmi-manasa-g commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443049116



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -259,6 +277,44 @@ public void stop() {
     return oldestOffsets;
   }
 
+  /**
+   * Checks if whether the given offset for the SSP has reached the latest 
offset (determined at init),
+   * removing it from the list of SSPs to catch up. Once the set of SSPs to 
catch up becomes empty, the latch for the
+   * task will count down, notifying {@link ContainerStorageManager} that it 
is caught up.
+   *
+   * @param ssp The SSP to be checked
+   * @param currentOffset The offset to be checked
+   * @param isStartingOffset Indicates whether the offset being checked is the 
starting offset of the SSP (and thus has
+   *                         not yet been processed). This will be set to true 
when each SSP's starting offset is checked
+   *                         on init, and false when checking if an ssp is 
caught up after processing an envelope.
+   */
+  private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, 
boolean isStartingOffset) {
+    String offsetToBlockUntil = this.sspOffsetsToBlockUntil.get(ssp);
+
+    LOG.trace("Checking offset {} against {} for {}. isStartingOffset: {}", 
currentOffset, offsetToBlockUntil, ssp, isStartingOffset);
+
+    Integer comparatorResult;
+    if (currentOffset == null || offsetToBlockUntil == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(currentOffset, 
offsetToBlockUntil);
+    }
+
+    // If the starting offset, it must be greater (since the envelope at the 
starting offset will not yet have been processed)

Review comment:
       Im confused: starting offset could be the last processed offset on file 
(or oldest from source), and offsetToBlockUntil is essentially the newest 
offset from metadata cache right. In this case, isnt the starting offset <= 
newest offset - "=" if nothing is present after the last processed offset and 
"<" if something is present after the last processed offset.
   This confusion might be arising from: my understanding (or messy 
understanding) of metadata cache and last processed offset on file.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is 
executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {

Review comment:
       Minor: why do we have taskName instead of getTaskName for a getter 
method? This PR is not introducing the methods but implementing them, so its 
not really a comment for this PR :)  I wanted to know for my understanding if 
we changed the guidelines/conventions recently.

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,17 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
+
+    this.sspOffsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = 
this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+      if (metadata != null) {
+        String offset = 
metadata.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getNewestOffset();
+        this.sspOffsetsToBlockUntil.put(ssp, offset);
+      }
+    }
+

Review comment:
       minor: init() so far has been a high-level method one calling helpers to 
do the actual stuff. So when i read it, I could easily get the steps init is 
taking without having to understand how to the steps are performed. Hence, 
wondering if there is value in moving this piece into a helper similar to 
`getStartingoffsets`?
   I feel it serves two purposes - keeps init a quick read and also clarifies 
what this step is in a single glance.. java doc for this helper would be a nice 
bonus ;)

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -154,17 +149,13 @@
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
   private final Map<SystemStreamPartition, TaskSideInputHandler> 
sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
+  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used 
by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
+  private RunLoop sideInputRunLoop;
 
   private final ExecutorService sideInputsReadExecutor = 
Executors.newSingleThreadExecutor(
       new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
 
-  private final ScheduledExecutorService sideInputsFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
-  private ScheduledFuture sideInputsFlushFuture;

Review comment:
       so flush (aka commit) was happening in a separate thread right.. now we 
have moved that into the same thread as read (where the side input runloop now 
is). Why did we have two threads for read and flush earlier and what is the 
effect of removing the flush thread? 

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -758,41 +744,50 @@ public void run() {
       sideInputSystemConsumers.register(ssp, startingOffset);
       
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      
sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+    }
 
-      SystemStreamMetadata systemStreamMetadata = 
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : 
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+    Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = 
this.sspSideInputHandlers.values().stream()
+        .distinct()
+        .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, 
Function.identity()));
 
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+    Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = 
this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
 
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, 
SystemStreamMetadata.OffsetType.UPCOMING, false);
-    }
+        RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, 
taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+
+        sideInputTasks.put(taskName, sideInputTask);
+      });
 
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
+    TaskConfig taskConfig = new TaskConfig(this.config);
+    SamzaContainerMetrics sideInputContainerMetrics =
+        new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + 
this.samzaContainerMetrics.source(),
+            this.samzaContainerMetrics.registry());
+
+    this.sideInputRunLoop = new RunLoop(sideInputTasks,
+        null, // all operations are executed in the main runloop thread
+        this.sideInputSystemConsumers,
+        1, // single message in flight per task
+        -1, // no windowing
+        taskConfig.getCommitMs(),
+        taskConfig.getCallbackTimeoutMs(),
+        1, // default taken from SamzaContainer

Review comment:
       This is supposed to be maxThrottlingDelayMs - as in milli seconds 
right.. why are we giving a value of 1 to this instead of 1 second? 
   SamzaContainer uses the default of `TimeUnit.SECONDS.toMillis(1)` which is 1 
second converted to milliseconds

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -758,41 +744,50 @@ public void run() {
       sideInputSystemConsumers.register(ssp, startingOffset);
       
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      
sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+    }
 
-      SystemStreamMetadata systemStreamMetadata = 
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : 
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+    Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = 
this.sspSideInputHandlers.values().stream()
+        .distinct()
+        .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, 
Function.identity()));
 
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+    Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = 
this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
 
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, 
SystemStreamMetadata.OffsetType.UPCOMING, false);
-    }
+        RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, 
taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+
+        sideInputTasks.put(taskName, sideInputTask);
+      });
 
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
+    TaskConfig taskConfig = new TaskConfig(this.config);
+    SamzaContainerMetrics sideInputContainerMetrics =
+        new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + 
this.samzaContainerMetrics.source(),
+            this.samzaContainerMetrics.registry());
+
+    this.sideInputRunLoop = new RunLoop(sideInputTasks,

Review comment:
       Not a comment: Are we not using the RunLoopFactory because we dont need 
to figure out the type of task or to avoid log statements? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to