lucasbru commented on code in PR #14193:
URL: https://github.com/apache/kafka/pull/14193#discussion_r1360189812


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    private void maybeLockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.

Review Comment:
   Yeah, I think there was some filtering here but I removed it in favour of 
setting the right set of tasks in the calling context. So the comment is just 
outdated and I removed it.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    private void maybeLockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.
+            if (log.isDebugEnabled()) {
+                log.debug("Locking tasks {}", 
ids.stream().map(TaskId::toString).collect(Collectors.joining(", ")));
+            }
+            boolean locked = false;
+            while (!locked) {
+                try {
+                    schedulingTaskManager.lockTasks(ids).get();
+                    locked = true;
+                } catch (final InterruptedException e) {
+                    log.warn("Interrupted while waiting for tasks {} to be 
locked",
+                        
ids.stream().map(TaskId::toString).collect(Collectors.joining(",")));
+                } catch (final ExecutionException e) {
+                    log.info("Failed to lock tasks");
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private void maybeUnlockTasks(final Set<TaskId> ids) {
+        if (schedulingTaskManager != null && !ids.isEmpty()) {
+            // Some tasks may be owned by the state updater and do not have to 
be locked in order to be committed.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to