lucasbru commented on code in PR #14193: URL: https://github.com/apache/kafka/pull/14193#discussion_r1360189812
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { } } + private void maybeLockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. Review Comment: Yeah, I think there was some filtering here but I removed it in favour of setting the right set of tasks in the calling context. So the comment is just outdated and I removed it. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1707,6 +1766,44 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { } } + private void maybeLockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. + if (log.isDebugEnabled()) { + log.debug("Locking tasks {}", ids.stream().map(TaskId::toString).collect(Collectors.joining(", "))); + } + boolean locked = false; + while (!locked) { + try { + schedulingTaskManager.lockTasks(ids).get(); + locked = true; + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for tasks {} to be locked", + ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); + } catch (final ExecutionException e) { + log.info("Failed to lock tasks"); + throw new RuntimeException(e); + } + } + } + } + + private void maybeUnlockTasks(final Set<TaskId> ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + // Some tasks may be owned by the state updater and do not have to be locked in order to be committed. Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org