guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r500001027



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -659,13 +665,12 @@ void runOnce() {
             }
         }
 
-        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try 
to fetch any data
-        changelogReader.restore();
-
-        // TODO: we should record the restore latency and its relative time 
spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if 
yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = 
restoreThread.nextCorruptedException();

Review comment:
       Updating the fields of TaskCorruptedException could be risky since it 
can be read by the other main thread concurrently. I think a better way would 
be still keeping its field as immutable, but drain all the exceptions (which is 
thread-safe) and then create a new one aggregating its tasks.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -115,13 +114,16 @@ public boolean isClosed() {
     @Override
     public void revive() {
         if (state == CLOSED) {
+            // clear all the stores since they should be re-registered

Review comment:
       `ProcessorStateManager#changelogPartitions` relies on its 
`changelogOffsets` which relies on the `stores` map. If `stores` map gets 
cleared, then `changelogPartitions` would return nothing. So in order to get 
its changelog partitions to send to the restore thread, we need to get them 
first before clearing the `stores` map, i.e. we need to "materialize" that 
`changelogPartitions` map first --- maybe we did not use the right term here, 
sorry.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -623,16 +613,46 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add closed tasks and then created tasks to work 
with those revived / recycled tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned or 
re-created from corrupted tasks
+        final List<AbstractTask> initializedTasks;
+        if (!(initializedTasks = 
taskManager.tryInitializeNewTasks()).isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Initializing newly created tasks {} under state {}",
+                        
initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), 
state);
+            }
+

Review comment:
       Ack.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> 
corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener 
userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, 
threadClientId,
+                adminClient, groupId, restoreConsumer, 
userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new 
AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = 
changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, 
will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<AbstractTask> 
tasks) {
+        if (!tasks.isEmpty()) {
+            for (final AbstractTask task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, 
task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<AbstractTask, 
Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<AbstractTask, Collection<TopicPartition>> 
entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, 
entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final Exception e) {
+            log.error("Encountered the following exception while restoring 
states " +
+                    "and the thread is going to shut down: ", e);
+            throw e;
+        } finally {
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the 
following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then 
initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        if (!items.isEmpty()) {
+            for (final TaskItem item : items) {
+                // TODO: we should consider also call the listener if the
+                //       changelog is not yet completed
+                if (item.type == ItemType.CLOSE) {
+                    changelogReader.unregister(item.changelogPartitions);
+
+                    log.info("Unregistered changelogs {} for closing task {}",
+                            item.task.changelogPartitions(),
+                            item.task.id());
+                } else if (item.type == ItemType.CREATE) {
+                    // we should only convert the state manager type right 
StateRestoreThreadTest.javabefore re-registering the changelog
+                    item.task.stateMgr.maybeConvertToNewTaskType();
+
+                    for (final TopicPartition partition : 
item.changelogPartitions) {
+                        changelogReader.register(partition, 
item.task.stateMgr);
+                    }
+
+                    log.info("Registered changelogs {} for created task {}",
+                            item.task.changelogPartitions(),
+                            item.task.id());
+                }
+            }

Review comment:
       I forgot to remove the type `REVIVE` actually... the revived task will 
be treated as a CLOSE followed by a CREATE, and since we always try to drain 
CLOSE'ed tasks and then CREATED tasks, it means revived tasks would always be 
processed as CLOSE and then as CREATE in order.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> 
corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener 
userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, 
threadClientId,
+                adminClient, groupId, restoreConsumer, 
userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new 
AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = 
changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, 
will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<AbstractTask> 
tasks) {
+        if (!tasks.isEmpty()) {
+            for (final AbstractTask task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, 
task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<AbstractTask, 
Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<AbstractTask, Collection<TopicPartition>> 
entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, 
entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final Exception e) {
+            log.error("Encountered the following exception while restoring 
states " +
+                    "and the thread is going to shut down: ", e);
+            throw e;
+        } finally {
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the 
following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then 
initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        if (!items.isEmpty()) {
+            for (final TaskItem item : items) {
+                // TODO: we should consider also call the listener if the

Review comment:
       Yeah I meant this exactly (you brought this up when I was in the middle 
of it, so I added this comment to remind anyone in the future). Created a 
ticket for this and updated the comment: 
https://issues.apache.org/jira/browse/KAFKA-10575

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -220,11 +224,6 @@ public void closeDirty() {
     @Override
     public void closeCleanAndRecycleState() {
         
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), 
id.toString());
-        if (state() == State.SUSPENDED) {

Review comment:
       Makes sense.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -185,7 +181,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask 
standbyTask,
         final LogContext logContext = getLogContext(standbyTask.id);
 
         standbyTask.closeCleanAndRecycleState();
-        stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
+        stateManager.prepareNewTaskType(TaskType.ACTIVE, logContext);

Review comment:
       The "check" I meant above is in StoreChangelogReader in various places, 
which is called by the restore thread. When an active task recycles to a 
standby task, the following can happen:
   
   1. task is closed.
   2. task type is changed to `standby`.
   3. task transits to CREATED state.
   
   Previously the changelogs are synchronously removed at step 1 and re-added 
in step 3. But now they are removed / re-added asynchronously, say as step 4/5. 
So if we still change its type in step 2, then it is possible that the 
changelog reader from the other thread tries to access its type in between and 
hence gets into error state. Therefore I have to defer step 2) in between 4) 
and 5), and executed by the restore thread rather than the main thread.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -623,16 +613,46 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add closed tasks and then created tasks to work 
with those revived / recycled tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned or 
re-created from corrupted tasks
+        final List<AbstractTask> initializedTasks;
+        if (!(initializedTasks = 
taskManager.tryInitializeNewTasks()).isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Initializing newly created tasks {} under state {}",
+                        
initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), 
state);
+            }
+
+            restoreThread.addInitializedTasks(initializedTasks);
+        }
+
+        // try complete restoration if there are any restoring tasks
+        if 
(taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs())) {
+            log.debug("Completed restoring all tasks now");

Review comment:
       Sounds good. I actually let it to return `boolean` for testing purposes, 
and I agree that in non-testing code we actually do not care much about the 
returned boolean. I will update accordingly.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> 
corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener 
userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, 
threadClientId,
+                adminClient, groupId, restoreConsumer, 
userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new 
AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = 
changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, 
will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<AbstractTask> 
tasks) {
+        if (!tasks.isEmpty()) {
+            for (final AbstractTask task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, 
task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<AbstractTask, 
Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<AbstractTask, Collection<TopicPartition>> 
entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, 
entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final Exception e) {
+            log.error("Encountered the following exception while restoring 
states " +
+                    "and the thread is going to shut down: ", e);
+            throw e;
+        } finally {
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the 
following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then 
initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        if (!items.isEmpty()) {
+            for (final TaskItem item : items) {
+                // TODO: we should consider also call the listener if the
+                //       changelog is not yet completed
+                if (item.type == ItemType.CLOSE) {
+                    changelogReader.unregister(item.changelogPartitions);
+
+                    log.info("Unregistered changelogs {} for closing task {}",
+                            item.task.changelogPartitions(),
+                            item.task.id());
+                } else if (item.type == ItemType.CREATE) {
+                    // we should only convert the state manager type right 
StateRestoreThreadTest.javabefore re-registering the changelog
+                    item.task.stateMgr.maybeConvertToNewTaskType();
+
+                    for (final TopicPartition partition : 
item.changelogPartitions) {
+                        changelogReader.register(partition, 
item.task.stateMgr);
+                    }
+
+                    log.info("Registered changelogs {} for created task {}",
+                            item.task.changelogPartitions(),
+                            item.task.id());
+                }
+            }
+        }
+        items.clear();
+
+        // try to restore some changelogs
+        final long startMs = time.milliseconds();
+        try {
+            final int numRestored = changelogReader.restore();
+            // TODO: we should record the restoration related metrics
+            log.debug("Restored {} records in {} ms", numRestored, 
time.milliseconds() - startMs);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                    "Will close the task as dirty and re-create and bootstrap 
from scratch.", e);
+
+            // remove corrupted partitions form the changelog reader and 
continue; we can still proceed
+            // and restore other partitions until the main thread come to 
handle this exception
+            
changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toList()));
+
+            corruptedExceptions.add(e);
+        } catch (final StreamsException e) {
+            // if we are shutting down, the consumer could throw interrupt 
exception which can be ignored;
+            // otherwise, we re-throw
+            if (!(e.getCause() instanceof InterruptException) || 
isRunning.get()) {
+                throw e;
+            }
+        } catch (final TimeoutException e) {
+            log.info("Encountered timeout when restoring states, will retry in 
the next loop");
+        }
+
+        // finally update completed changelogs
+        completedChangelogs.set(changelogReader.completedChangelogs());
+    }
+
+    public TaskCorruptedException nextCorruptedException() {
+        return corruptedExceptions.poll();
+    }
+
+    public void shutdown(final long timeoutMs) throws InterruptedException {
+        log.info("Shutting down");
+
+        isRunning.set(false);
+        interrupt();
+
+        final boolean ret = shutdownLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
+
+        if (ret) {
+            log.info("Shutdown complete");
+        } else {
+            log.warn("Shutdown timed out after {}", timeoutMs);
+        }
+    }
+
+    private enum ItemType {
+        CREATE,
+        CLOSE,
+        REVIVE
+    }
+
+    private static class TaskItem {
+        private final ItemType type;
+        private final AbstractTask task;

Review comment:
       Yeah `AbstractTask#stateManager` is the reason I went with 
`AbstractTask`. I can add a new method to `Task`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> 
corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener 
userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, 
threadClientId,
+                adminClient, groupId, restoreConsumer, 
userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new 
AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = 
changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, 
will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<AbstractTask> 
tasks) {
+        if (!tasks.isEmpty()) {
+            for (final AbstractTask task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, 
task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<AbstractTask, 
Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<AbstractTask, Collection<TopicPartition>> 
entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, 
entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final Exception e) {
+            log.error("Encountered the following exception while restoring 
states " +
+                    "and the thread is going to shut down: ", e);
+            throw e;
+        } finally {
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the 
following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then 
initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        if (!items.isEmpty()) {

Review comment:
       Ack

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -440,8 +455,11 @@ public void restore() {
 
                 final Map<TaskId, Collection<TopicPartition>> 
taskWithCorruptedChangelogs = new HashMap<>();
                 for (final TopicPartition partition : e.partitions()) {
-                    final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
-                    taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> 
new HashSet<>()).add(partition);
+                    final ChangelogMetadata metadata = 
changelogs.get(partition);
+                    if (metadata != null) {

Review comment:
       Makes sense.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -623,16 +613,46 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add closed tasks and then created tasks to work 
with those revived / recycled tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned or 
re-created from corrupted tasks
+        final List<AbstractTask> initializedTasks;
+        if (!(initializedTasks = 
taskManager.tryInitializeNewTasks()).isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Initializing newly created tasks {} under state {}",
+                        
initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), 
state);
+            }
+
+            restoreThread.addInitializedTasks(initializedTasks);
+        }
+
+        // try complete restoration if there are any restoring tasks
+        if 
(taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs())) {
+            log.debug("Completed restoring all tasks now");
+        }
+
+        if (state == State.PARTITIONS_ASSIGNED && 
taskManager.allTasksRunning()) {
+            // it is possible that we have no assigned tasks in which case we 
would still transit state
+            setState(State.RUNNING);
+
+            log.debug("All tasks are now running and transited State to {}", 
State.RUNNING);
+        }
 
-        // TODO: we should record the restore latency and its relative time 
spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if 
yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = 
restoreThread.nextCorruptedException();
+        if (e != null) {
+            throw e;
+        }
 
         int totalProcessed = 0;
         long totalCommitLatency = 0L;
         long totalProcessLatency = 0L;
         long totalPunctuateLatency = 0L;
+
+        // TODO: we should allow active tasks processing even if we are not 
yet in RUNNING
+        //       after restoration is moved to the other thread

Review comment:
       We do have a JIRA ticket for this, and I do plan to do it right after 
this PR (since this is really a pre-requisite to have that). I will link the 
ticket here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to