guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r488375084



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
##########
@@ -34,7 +34,7 @@
 
     final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
+    public ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {

Review comment:
       Need to make public for unit testing.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -146,6 +146,7 @@ public synchronized void assign(Collection<TopicPartition> 
partitions) {
         ensureNotClosed();
         committed.clear();
         this.subscriptions.assignFromUser(new HashSet<>(partitions));
+        this.paused.retainAll(partitions);

Review comment:
       Piggy-backed fix, when unassigned those partitions, they should also be 
removed from paused list.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -185,7 +181,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask 
standbyTask,
         final LogContext logContext = getLogContext(standbyTask.id);
 
         standbyTask.closeCleanAndRecycleState();
-        stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
+        stateManager.prepareNewTaskType(TaskType.ACTIVE, logContext);

Review comment:
       This is another tricky part: Since the restore thread check's the 
state-manager's type to decide whether to check for completion, for those 
recycled tasks we should not change their type immediately before they are 
asynchronously de-registered (and later re-registered) with the restore thread.

##########
File path: streams/src/test/java/org/apache/kafka/test/StateMachineTask.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.AbstractTask;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.Task;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class StateMachineTask extends AbstractTask implements Task {

Review comment:
       Extracted this mock class since it is now needed in multiple unit tests.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -659,13 +665,12 @@ void runOnce() {
             }
         }
 
-        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try 
to fetch any data
-        changelogReader.restore();
-
-        // TODO: we should record the restore latency and its relative time 
spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if 
yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = 
restoreThread.nextCorruptedException();

Review comment:
       @ableegoldman I've not addressed this comment but it's on my next PR to 
do it, this one has become too large and hence I'm only keeping necessary 
changes to get correctness.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
##########
@@ -26,24 +26,18 @@
 public interface ChangelogReader extends ChangelogRegister {
     /**
      * Restore all registered state stores by reading from their changelogs
+     *
+     * @return number of records restored in this call
      */
-    void restore();
-
-    /**
-     * Transit to restore active changelogs mode
-     */
-    void enforceRestoreActive();

Review comment:
       With the restore changelog reader solely owned by a single thread, we do 
not need make this public APIs any more.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -601,12 +620,14 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
                 throw new StreamsException("State restore listener failed on 
restore completed", e);
             }
         }
+
+        return numRecords;
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final 
Set<TopicPartition> partitions) {
         final Map<TopicPartition, Long> committedOffsets;
         try {
-            committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
+            committedOffsets = fetchCommittedOffsets(partitions, groupId, 
adminClient);

Review comment:
       Now with the changelog reader accessed by a separate thread, we cannot 
let it to access the main consumer anymore since the consumer is not 
thread-safe and we do not allow concurrent access.
   
   So here I'm switching to use the admin client to get committed offsets.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -115,13 +114,16 @@ public boolean isClosed() {
     @Override
     public void revive() {
         if (state == CLOSED) {
+            // clear all the stores since they should be re-registered

Review comment:
       Here's one of the tricky things that we need to pay attention to, since 
we need the task state-manager's changelog partitions for de-registration 
asynchronously at the restore-thread. And there are a few different scenarios:
   
   1) completely closing the task: we remove the task from the task-manager, 
but materialized the state-manager's changelog partitions before clearing the 
stores since they need to be enqueued to the restore-thread.
   
   2) reviving a corrupted task: we close the task but do NOT remove it from 
the task-manager, and we materialize the changelog partitions before clearing 
the stores.
   
   3) recycling a task: we do NOT close the task but we do remove it from the 
task-manager, and hence we do not clear the state-manager's stores as well 
since we want to skip re-registering them. BUT we still need to materialize the 
changelog partitions to be enqueued to the restore-thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to