guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717027686
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -189,14 +189,13 @@ void handleCorruption(final Set<TaskId> corruptedTasks) {
// We need to commit before closing the corrupted active tasks since
this will force the ongoing txn to abort
try {
- commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()
Review comment:
No logical change here, just renamings.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(),
anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
- consumer.commitSync(eq(emptyMap()));
Review comment:
These unit tests mocking are changed since originally we would send
empty offsets.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1084,26 +1083,27 @@ int commit(final Collection<Task> tasksToCommit) {
* or if the task producer got fenced (EOS)
* @throws TimeoutException if committing offsets failed due to
TimeoutException (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
- * @param consumedOffsetsAndMetadataPerTask an empty map that will be
filled in with the prepared offsets
+ * @param consumedOffsetsAndMetadata an empty map that will be filled in
with the prepared offsets
* @return number of committed offsets, or -1 if we are in the middle of a
rebalance and cannot commit
*/
- private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final
Collection<Task> tasksToCommit,
- final
Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask) {
+ private int commitTasksAndMaybeUpdateCommittableOffsets(final
Collection<Task> tasksToCommit,
+ final Map<Task,
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
if (rebalanceInProgress) {
return -1;
}
int committed = 0;
for (final Task task : tasksToCommit) {
+ // we need to call commitNeeded first since we need to update
committable offsets
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata
= task.prepareCommit();
- if (task.isActive()) {
Review comment:
We do not need to check on `task.isActive()` since for standby tasks, it
would always return empty offset map. Instead we should just make sure that the
map is not empty (for active tasks, they are possibly empty depending on their
states) before adding to the map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]