mynameborat commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443689543



##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is 
executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {
+    return this.taskName;
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, ReadableCoordinator 
coordinator,
+      TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();
+    this.metrics.processes().inc();
+    try {
+      this.taskSideInputHandler.process(envelope);
+      this.metrics.messagesActuallyProcessed().inc();
+      callback.complete();
+    } catch (Exception e) {
+      callback.failure(e);
+    }
+  }
+
+  @Override
+  public void window(ReadableCoordinator coordinator) {
+
+  }
+
+  @Override
+  public void scheduler(ReadableCoordinator coordinator) {
+
+  }

Review comment:
       I understand these don't get called for side input tasks. Do we still 
want to throw exceptions or log warnings here as a safe guard? 
   Maybe consider this for other methods that aren't implemented as part of the 
interface. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -63,19 +65,23 @@
   private final Map<String, SideInputsProcessor> storeToProcessor;
   private final SystemAdmins systemAdmins;
   private final StreamMetadataCache streamMetadataCache;
+  // indicates to container that all side input ssps in this task are caught up

Review comment:
       nit: indicates to _CSM_ would be simpler yet accurate. It requires the 
entire e2e picture and how CSM interacts with container to understand this 
comment.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,14 +108,12 @@
 public class ContainerStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
-  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read 
Thread";
-  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush 
Thread";
+  private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
   // We use a prefix to differentiate the SystemConsumersMetrics for 
sideInputs from the ones in SamzaContainer
 
-  private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // 
Timeout with which sideinput read thread checks for exceptions
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = 
Duration.ofMinutes(1); // Period with which sideinputs are flushed
-
+  private static final int SIDE_INPUT_LATCH_TIMEOUT_SECONDS = 10; // Timeout 
with which sideinput thread checks for exceptions

Review comment:
       can we update the comment here? this latch is used to check for caught 
up status currently. The side input exceptions get populated by the runloop 
flow.
   
   Also, I'd suggest to rename this to something like 
`SIDE_INPUT_CAUGHT_CHECK_TIMEOUT` instead of of latch as one needs to figure 
out what this latch is used for. Maybe clarify in the comment that the check 
timeout applies across all the side inputs.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is 
executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {
+    return this.taskName;
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, ReadableCoordinator 
coordinator,
+      TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();
+    this.metrics.processes().inc();
+    try {
+      this.taskSideInputHandler.process(envelope);
+      this.metrics.messagesActuallyProcessed().inc();

Review comment:
       > New side input processing metrics emitted by SamzaContainerMetrics and 
TaskInstanceMetrics are given their own namespace in order to differentiate 
from the primary container / run loop.
   
   Does this mean post this change, the old metrics emitted (if any) wouldn't 
work. If we didn't emit metrics for these prior to this change well and good. 
If not, would suggest updating the API changes description about metrics

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -259,6 +288,44 @@ public void stop() {
     return oldestOffsets;
   }
 
+  /**
+   * An SSP is considered caught up once the offset indicated for it in {@link 
#sspOffsetsToBlockUntil} has been
+   * processed. Once the set of SSPs to catch up becomes empty, the latch for 
the task will count down, notifying
+   * {@link ContainerStorageManager} that it is caught up.
+   *
+   * @param ssp The SSP to be checked
+   * @param currentOffset The offset to be checked
+   * @param isStartingOffset Indicates whether the offset being checked is the 
starting offset of the SSP (and thus has
+   *                         not yet been processed). This will be set to true 
when each SSP's starting offset is checked
+   *                         on init, and false when checking if an ssp is 
caught up after processing an envelope.
+   */
+  private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, 
boolean isStartingOffset) {
+    String offsetToBlockUntil = this.sspOffsetsToBlockUntil.get(ssp);
+
+    LOG.trace("Checking offset {} against {} for {}. isStartingOffset: {}", 
currentOffset, offsetToBlockUntil, ssp, isStartingOffset);
+
+    Integer comparatorResult;
+    if (currentOffset == null || offsetToBlockUntil == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(currentOffset, 
offsetToBlockUntil);
+    }
+
+    // If the starting offset, it must be greater (since the envelope at the 
starting offset will not yet have been processed)
+    // If not the starting offset, it must be greater than OR equal
+    if (comparatorResult != null && ((isStartingOffset && comparatorResult > 
0) || (!isStartingOffset && comparatorResult >= 0))) {
+      LOG.info("Side input ssp {} has caught up to offset {}.", ssp, 
offsetToBlockUntil);

Review comment:
       can we keep the existing functionality as is in this PR? why do we need 
to differentiate this being checked during starting vs process?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,28 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
+
+    this.sspOffsetsToBlockUntil = getOffsetsToBlockUntil();
+    LOG.info("Task {} will catch up to offsets {}", this.taskName, 
this.sspOffsetsToBlockUntil);
+
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, 
true));
+  }
+
+  /**
+   * Retrieves the newest offset for each SSP
+   *
+   * @return a map of SSP to newest offset
+   */
+  private Map<SystemStreamPartition, String> getOffsetsToBlockUntil() {
+    Map<SystemStreamPartition, String> offsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = 
this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);

Review comment:
       minor: would be good to extract the boolean argument to variable for 
readability.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -604,6 +597,9 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
           Map<String, StorageEngine> sideInputStores = 
getSideInputStores(taskName);
           Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new 
HashMap<>();
 
+          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
+          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);

Review comment:
       why do we need granular latches? Can we not use one latch with the size 
of number of side input tasks & check that reaches 0 instead?

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -154,17 +149,13 @@
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
   private final Map<SystemStreamPartition, TaskSideInputHandler> 
sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
+  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used 
by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
+  private RunLoop sideInputRunLoop;
 
   private final ExecutorService sideInputsReadExecutor = 
Executors.newSingleThreadExecutor(
       new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
 
-  private final ScheduledExecutorService sideInputsFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
-  private ScheduledFuture sideInputsFlushFuture;

Review comment:
       CSM originally only handled state restoration which didn't require any 
commit/flush semantics. Eventually side inputs got rolled into CSM to pay way 
for standby and side inputs needed to be flushed in our commit cadence. The 
need for a separate timer thread to trigger flush/commit on task.commit.ms 
cadence was required. Instead of having logic to coordinate signals between 
read thread and the timer thread, flush was embedded as part of the timer 
thread and synchronization was introduced between process & commit.
   
   With run loop, the internal timer thread run loop maintains does this job of 
signaling commit readiness to the task  and hence you don't need a separate 
flush thread. However, one thing to note is, previously we can potentially have 
two operations (process & flush) in parallel for different side inputs but with 
the new setup, we will need a thread pool size of (number of side input tasks + 
1) to ensure parity. We will eventually support parallelism across stores/side 
input tasks as part of transaction state support (i guess). 
   
   So it is fine to leave it as is.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is 
executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {

Review comment:
       I am not sure I follow the later part of the comment @bkonold. The 
exclusivity holds regardless due to the fact that the side input handler has 
synchronization between flush & process. `SideInputTask` in the current state 
is also not likely impacted because all it does it delegate calls to handler & 
increment metrics.
   
   Handling async commit might be a bit more involved than just synchronizing 
process & commit. I'd need to think a bit more on this. I'd suggest we have a 
validation if possible to throw if async commit is enabled for now. 
   
   To Manasa's point, the fact that process & commit can happen concurrently 
doesn't translate to thread safety concerns for all implementations. It is 
possible that the implementations of process & commit don't need 
synchronization. However, it is useful to call out that synchronization between 
those were explicitly ignored due to the state of current implementation.
   
   

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -758,41 +745,50 @@ public void run() {
       sideInputSystemConsumers.register(ssp, startingOffset);
       
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      
sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+    }
 
-      SystemStreamMetadata systemStreamMetadata = 
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : 
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+    Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = 
this.sspSideInputHandlers.values().stream()
+        .distinct()
+        .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, 
Function.identity()));
 
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+    Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = 
this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
 
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, 
SystemStreamMetadata.OffsetType.UPCOMING, false);
-    }
+        RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, 
taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+
+        sideInputTasks.put(taskName, sideInputTask);
+      });
 
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
+    TaskConfig taskConfig = new TaskConfig(this.config);
+    SamzaContainerMetrics sideInputContainerMetrics =
+        new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + 
this.samzaContainerMetrics.source(),
+            this.samzaContainerMetrics.registry());
+
+    this.sideInputRunLoop = new RunLoop(sideInputTasks,
+        null, // all operations are executed in the main runloop thread
+        this.sideInputSystemConsumers,
+        1, // single message in flight per task
+        -1, // no windowing
+        taskConfig.getCommitMs(),
+        taskConfig.getCallbackTimeoutMs(),
+        this.config.getLong("container.disk.quota.delay.max.ms", 
TimeUnit.SECONDS.toMillis(1)),

Review comment:
       can we extract this into a constant in some common file and use the 
constant in both places(here & SamzaContainer)?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,28 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
+
+    this.sspOffsetsToBlockUntil = getOffsetsToBlockUntil();
+    LOG.info("Task {} will catch up to offsets {}", this.taskName, 
this.sspOffsetsToBlockUntil);
+
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, 
true));
+  }
+
+  /**
+   * Retrieves the newest offset for each SSP
+   *
+   * @return a map of SSP to newest offset
+   */
+  private Map<SystemStreamPartition, String> getOffsetsToBlockUntil() {
+    Map<SystemStreamPartition, String> offsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = 
this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);

Review comment:
       minor: extract false to meaningful name to help readability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to