[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-29 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1246298268


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   What could make the simplified code break, is that we decide to not release 
the lock before transitioning to the `CLOSED` state. 
   
   So yeah, being defensive here and going through all `CREATED` and `CLOSED` 
tasks as well to make sure that they do not have state directories that are 
locked but not inside `lockedTaskDirectories` sound good to me as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Well, if there is no task directory, there is no checkpoint to process. So 
it's safe to not do anything in this case.
   
   All you'd do by adding more tasks is to later skip on the check 
`checkPointFile.exists()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Well, if there is no task directory, there is no checkpoint to process. So 
it's safe to not do anything in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245427580


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Ah, I recommended this change thinking that `lockedTaskDirectories` always 
includes all `ClosedAndCreatedTasks` -- I think it does right? So it should be 
enough to assign this to `lockedTaskDirectories`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245383102


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,35 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set createdAndClosedTasks = new HashSet<>();
+for (final Task task : tasks.values()) {
+if (task.state() != State.CREATED && task.state() != State.CLOSED) 
{
 final Map changelogOffsets = 
task.changelogOffsets();
 if (changelogOffsets.isEmpty()) {
-log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);
+log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}",
+task.id());
 } else {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
changelogOffsets));
+taskOffsetSums.put(task.id(), 
sumOfChangelogOffsets(task.id(), changelogOffsets));
 }
 } else {
-final File checkpointFile = 
stateDirectory.checkpointFileFor(id);
-try {
-if (checkpointFile.exists()) {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
-}
-} catch (final IOException e) {
-log.warn(String.format("Exception caught while trying to 
read checkpoint for task %s:", id), e);
+createdAndClosedTasks.add(task.id());

Review Comment:
   nit: if you want to do it with fewer collections, you could inititialize 
`lockedTaskDirectoriesOfNonOwnedTasks` earlier, and just remove directly from 
that set in the `if` branch, instead of adding to `createdAndClosedTasks` in 
the `else` branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org