guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466108344
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -479,24 +512,20 @@ boolean tryToCompleteRestoration() {
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TopicPartition> remainingRevokedPartitions = new
HashSet<>(revokedPartitions);
- final Set<Task> revokedTasks = new HashSet<>();
- final Set<Task> additionalTasksForCommitting = new HashSet<>();
- final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-
+ final Set<Task> revokedActiveTasks = new HashSet<>();
+ final Set<Task> nonRevokedActiveTasks = new HashSet<>();
+ final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsPerTask = new HashMap<>();
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
+
for (final Task task : activeTaskIterable()) {
if
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
- try {
- task.suspend();
- revokedTasks.add(task);
- } catch (final RuntimeException e) {
- log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
- firstException.compareAndSet(null, new
StreamsException("Failed to suspend " + task.id(), e));
- }
+ // when the task input partitions are included in the revoked
list,
+ // this is an active task and should be revoked
+ revokedActiveTasks.add(task);
+ remainingRevokedPartitions.removeAll(task.inputPartitions());
} else if (task.commitNeeded()) {
- additionalTasksForCommitting.add(task);
+ nonRevokedActiveTasks.add(task);
Review comment:
Yeah I think I agree with you -- after a second thought I think this
renaming is not very accurate. Will call it `commitNeededActiveTasks`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]