[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-16 Thread via GitHub


guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1109175366


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -3159,7 +3159,7 @@ private void addRecord(final MockConsumer 
mockConsumer,
 }
 
 StreamTask activeTask(final TaskManager taskManager, final TopicPartition 
partition) {
-final Stream standbys = 
taskManager.allTasks().values().stream().filter(Task::isActive);
+final Stream standbys = 
taskManager.allOwnedTasks().values().stream().filter(Task::isActive);

Review Comment:
   For my understanding: why we need to change the callee here as well?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java:
##
@@ -41,7 +41,6 @@ class ReadOnlyTaskTest {
 add("changelogPartitions");
 add("commitRequested");
 add("isActive");
-add("commitNeeded");

Review Comment:
   Good to know :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-14 Thread via GitHub


guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1102201605


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -258,21 +269,27 @@ private List getTasksAndActions() {
 }
 
 private void addTask(final Task task) {
+final TaskId taskId = task.id();
 if (isStateless(task)) {
 addToRestoredTasks((StreamTask) task);
-log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+} else if (topologyMetadata.isPaused(taskId.topologyName())) {
+pausedTasks.put(taskId, task);

Review Comment:
   While working on another PR I realized in `checkAllUpdatingTaskStates` we 
only try to pause tasks when the commit interval has elapsed, i.e. when we 
pause a named topology, its corresponding tasks may only be paused after a 
while, while when we resume, tasks are resumed immediately.
   
   So I think we should move the `pausing` logic out of the 
`checkAllUpdatingTaskStates` as well like `resuming`, which would leave 
`checkAllUpdatingTaskStates` to just become `maybeCheckpointUpdatingTasks`. 
WDYT?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   I think the actual culprit is like @lucasbru said, `StreamThread.maybeCommit 
uses TaskManager.allTasks`, while the callee `TaskManager.allTasks` is used in 
many different places.. For example for IQ, it was used and in that case it 
should return all tasks, whereas in this caller, `TaskManager.allTasks` should 
just return all processing tasks when updater is enabled, i.e. only the ones in 
`TaskRegistry`.
   
   > I am also wondering why Streams also commits restoring tasks.
   
   For that, the rationale is to allow the restoration progress to be recorded 
as well in case it was paused due to another rebalance (though it's 
`prepareCommit` would always return empty map, so the `committing` process for 
those restoring tasks would be reduced to writing checkpoints). But somehow in 
the middle of history we lost the logic to ever change `commitNeeded` flag for 
restoring tasks, so they would always not be triggered. Hence a new JIRA is 
proposed to fix it, and we fixed it in state updater.
   
   I thought about fixing it in a probably better way, which involves more 
changes: 1) introduce a `TaskManager.allProcessingTasks`, 2) depending on the 
updater enabled flag, let `maybeCommit` call either this one or `allTasks`. 
When I thought about that, I admit I was thinking this change is a bit too 
much, and since tasks in updater the task should not make any side-effects and 
return `commitNeeded` false this maybe just okay.
   
   But thinking about this a bit more, I think it's indeed worrisome to let 
`ReadyOnlyTask.commitNeeded` to have any potential side effects if we ever have 
bugs..



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   Ack. That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-01-31 Thread via GitHub


guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1092598270


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   Why do we need to change these two functions?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -258,21 +269,27 @@ private List getTasksAndActions() {
 }
 
 private void addTask(final Task task) {
+final TaskId taskId = task.id();
 if (isStateless(task)) {
 addToRestoredTasks((StreamTask) task);
-log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+} else if (topologyMetadata.isPaused(taskId.topologyName())) {
+pausedTasks.put(taskId, task);

Review Comment:
   I'm wondering if this complexity is necessary, since we do not make strict 
ordering guarantees for paused topologies -- i.e. it's okay to still processing 
those tasks for a while after the `pause()` call is triggered. Is it really a 
correctness or concurrency issue?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1534,7 +1540,13 @@ Set standbyTaskIds() {
 Map allTasks() {
 // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
 // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-return tasks.allTasksPerId();
+if (stateUpdater != null) {
+final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+ret.putAll(tasks.allTasksPerId());

Review Comment:
   I've changed the func name slightly in another PR, so if that PR is merged 
we need to do a slight rebase/conflict resolution, just FYI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org