guozhangwang commented on a change in pull request #8988: URL: https://github.com/apache/kafka/pull/8988#discussion_r488375084
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java ########## @@ -34,7 +34,7 @@ final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future; - ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) { + public ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) { Review comment: Need to make public for unit testing. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ########## @@ -146,6 +146,7 @@ public synchronized void assign(Collection<TopicPartition> partitions) { ensureNotClosed(); committed.clear(); this.subscriptions.assignFromUser(new HashSet<>(partitions)); + this.paused.retainAll(partitions); Review comment: Piggy-backed fix, when unassigned those partitions, they should also be removed from paused list. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ########## @@ -185,7 +181,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask, final LogContext logContext = getLogContext(standbyTask.id); standbyTask.closeCleanAndRecycleState(); - stateManager.transitionTaskType(TaskType.ACTIVE, logContext); + stateManager.prepareNewTaskType(TaskType.ACTIVE, logContext); Review comment: This is another tricky part: Since the restore thread check's the state-manager's type to decide whether to check for completion, for those recycled tasks we should not change their type immediately before they are asynchronously de-registered (and later re-registered) with the restore thread. ########## File path: streams/src/test/java/org/apache/kafka/test/StateMachineTask.java ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.AbstractTask; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.Task; + +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class StateMachineTask extends AbstractTask implements Task { Review comment: Extracted this mock class since it is now needed in multiple unit tests. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -659,13 +665,12 @@ void runOnce() { } } - // we can always let changelog reader try restoring in order to initialize the changelogs; - // if there's no active restoring or standby updating it would not try to fetch any data - changelogReader.restore(); - - // TODO: we should record the restore latency and its relative time spent ratio after - // we figure out how to move this method out of the stream thread - advanceNowAndComputeLatency(); + // check if restore thread has encountered TaskCorrupted exception; if yes + // rethrow it to trigger the handling logic + final TaskCorruptedException e = restoreThread.nextCorruptedException(); Review comment: @ableegoldman I've not addressed this comment but it's on my next PR to do it, this one has become too large and hence I'm only keeping necessary changes to get correctness. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java ########## @@ -26,24 +26,18 @@ public interface ChangelogReader extends ChangelogRegister { /** * Restore all registered state stores by reading from their changelogs + * + * @return number of records restored in this call */ - void restore(); - - /** - * Transit to restore active changelogs mode - */ - void enforceRestoreActive(); Review comment: With the restore changelog reader solely owned by a single thread, we do not need make this public APIs any more. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ########## @@ -601,12 +620,14 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { throw new StreamsException("State restore listener failed on restore completed", e); } } + + return numRecords; } private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) { final Map<TopicPartition, Long> committedOffsets; try { - committedOffsets = fetchCommittedOffsets(partitions, mainConsumer); + committedOffsets = fetchCommittedOffsets(partitions, groupId, adminClient); Review comment: Now with the changelog reader accessed by a separate thread, we cannot let it to access the main consumer anymore since the consumer is not thread-safe and we do not allow concurrent access. So here I'm switching to use the admin client to get committed offsets. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ########## @@ -115,13 +114,16 @@ public boolean isClosed() { @Override public void revive() { if (state == CLOSED) { + // clear all the stores since they should be re-registered Review comment: Here's one of the tricky things that we need to pay attention to, since we need the task state-manager's changelog partitions for de-registration asynchronously at the restore-thread. And there are a few different scenarios: 1) completely closing the task: we remove the task from the task-manager, but materialized the state-manager's changelog partitions before clearing the stores since they need to be enqueued to the restore-thread. 2) reviving a corrupted task: we close the task but do NOT remove it from the task-manager, and we materialize the changelog partitions before clearing the stores. 3) recycling a task: we do NOT close the task but we do remove it from the task-manager, and hence we do not clear the state-manager's stores as well since we want to skip re-registering them. BUT we still need to materialize the changelog partitions to be enqueued to the restore-thread. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org