[ 
https://issues.apache.org/jira/browse/KAFKA-14847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14847:
------------------------------------
    Description: 
Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
TaskManager/TaskExecutor for committing tasks from various scenarios, the call 
path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can 
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, 
this is fine to commit just a subset of the tasks; however for EOS-v2, since 
all tasks participate in the same txn it could lead to dangerous violations, 
and today we are relying on all the callers of the commit function to make sure 
that the list of tasks they passed in, under EOS-v1, would still not violate 
the semantics. As summarized today (thanks to Matthias), today that callee 
could be triggered in the following cases:

1) Inside handleRevocation() – this is a clean path, an we add all non-revoked 
tasks with commitNeeded() flag set to the commit – so this seems to be fine.
2) tryCloseCleanAllActiveTasks() – here we only call it, if 
tasksToCloseDirty.isEmpty() – so it seems fine, too.
3) commit() with a list of task handed in – we call commit() inside the TM 
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as 
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() – here, we only consider RUNNING and RESTORING 
tasks, which are not corrupted – note we only throw a TaskCorruptedException 
during restore state initialization, thus, corrupted tasks did not process 
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() – under ALOS/EOS-v1, as we just commit 
a subset of tasks' source offsets while at the same time we still commit those 
unsuccessful task's outgoing records if there are any. (For EOS-v2 the list of 
tasks should be empty.)

Just going through this list of callers itself, as demonstrated above, is 
already pretty complex, and very vulnerable to bugs. It's better to not rely on 
the callers, but the callees to make sure that's the case. More concretely, I 
think we can introduce a new function called {{commitAllTasks}} such that under 
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are 
some tasks that should not be committed because we know they have not processed 
any data, the {{commitAllTasks}} callee itself would do some clever filtering 
internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is 
removed.

  was:
Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
TaskManager/TaskExecutor for committing tasks from various scenarios, the call 
path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can 
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, 
this is fine to commit just a subset of the tasks; however for EOS-v2, since 
all tasks participate in the same txn it could lead to dangerous violations, 
and today we are relying on all the callers of the commit function to make sure 
that the list of tasks they passed in, under EOS-v1, would still not violate 
the semantics. As summarized today (thanks to Matthias), today that callee 
could be triggered in the following cases:

1) Inside handleRevocation() – this is a clean path, an we add all non-revoked 
tasks with commitNeeded() flag set to the commit – so this seems to be fine.
2) tryCloseCleanAllActiveTasks() – here we only call it, if 
tasksToCloseDirty.isEmpty() – so it seems fine, too.
3) commit() with a list of task handed in – we call commit() inside the TM 
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as 
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() – here, we only consider RUNNING and RESTORING 
tasks, which are not corrupted – note we only throw a TaskCorruptedException 
during restore state initialization, thus, corrupted tasks did not process 
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() – under ALOS/EOS-v1, as we just commit 
a subset of tasks' source offsets while at the same time we still commit those 
unsuccessful task's outgoing records if there are any.

Just going through this list of callers itself, as demonstrated above, is 
already pretty complex, and very vulnerable to bugs. It's better to not rely on 
the callers, but the callees to make sure that's the case. More concretely, I 
think we can introduce a new function called {{commitAllTasks}} such that under 
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are 
some tasks that should not be committed because we know they have not processed 
any data, the {{commitAllTasks}} callee itself would do some clever filtering 
internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is 
removed.


> Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14847
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14847
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
> TaskManager/TaskExecutor for committing tasks from various scenarios, the 
> call path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
> {{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which 
> can be a subset of the tasks that thread / task manager owns. For EOS-v1 / 
> ALOS, this is fine to commit just a subset of the tasks; however for EOS-v2, 
> since all tasks participate in the same txn it could lead to dangerous 
> violations, and today we are relying on all the callers of the commit 
> function to make sure that the list of tasks they passed in, under EOS-v1, 
> would still not violate the semantics. As summarized today (thanks to 
> Matthias), today that callee could be triggered in the following cases:
> 1) Inside handleRevocation() – this is a clean path, an we add all 
> non-revoked tasks with commitNeeded() flag set to the commit – so this seems 
> to be fine.
> 2) tryCloseCleanAllActiveTasks() – here we only call it, if 
> tasksToCloseDirty.isEmpty() – so it seems fine, too.
> 3) commit() with a list of task handed in – we call commit() inside the TM 
> three time
> 3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
> 3.b) inside maybeCommitActiveTasksPerUserRequested as 
> commit(activeTaskIterable()); (passing in all tasks)
> 3.c) inside handleCorruption() – here, we only consider RUNNING and RESTORING 
> tasks, which are not corrupted – note we only throw a TaskCorruptedException 
> during restore state initialization, thus, corrupted tasks did not process 
> anything yet, and all other tasks should be clean to be committed.
> 3.d) commitSuccessfullyProcessedTasks() – under ALOS/EOS-v1, as we just 
> commit a subset of tasks' source offsets while at the same time we still 
> commit those unsuccessful task's outgoing records if there are any. (For 
> EOS-v2 the list of tasks should be empty.)
> Just going through this list of callers itself, as demonstrated above, is 
> already pretty complex, and very vulnerable to bugs. It's better to not rely 
> on the callers, but the callees to make sure that's the case. More 
> concretely, I think we can introduce a new function called {{commitAllTasks}} 
> such that under EOS-v2, the caller always call {{commitAllTasks}} instead, 
> and if there are some tasks that should not be committed because we know they 
> have not processed any data, the {{commitAllTasks}} callee itself would do 
> some clever filtering internally.
> Given its scope, I think it's better to do this refactoring after EOS-v1 is 
> removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to