vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452357020
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -215,7 +215,7 @@ public StateStore getGlobalStore(final String name) { } // package-private for test only - void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { + void initializeStoreOffsetsFromCheckpoint(final boolean taskDirIsEmpty) { Review comment: Was deceptively named. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ########## @@ -566,8 +567,20 @@ public void shouldReviveCorruptTasks() { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); + expect(consumer.assignment()).andReturn(taskId00Partitions); + consumer.pause(taskId00Partitions); + expectLastCall(); + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L); + expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata)); + consumer.seek(t1p0, offsetAndMetadata); + expectLastCall(); + consumer.seekToBeginning(emptySet()); + expectLastCall(); replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader); - + taskManager.setPartitionResetter(tp -> { + assertThat(tp, is(empty())); + return emptySet(); + }); Review comment: This all amounts to checking that we really reset the consumer to the last committed position. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ########## @@ -504,7 +504,7 @@ private void resetOffsetPosition(TopicPartition tp) { if (strategy == OffsetResetStrategy.EARLIEST) { offset = beginningOffsets.get(tp); if (offset == null) - throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); + throw new IllegalStateException("MockConsumer didn't have beginning offset for " + tp + " specified, but tried to seek to beginning"); Review comment: Just a quality-of-life improvement. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { - // with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file + // With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception - if (eosEnabled && !storeDirIsEmpty) { + + // Note, this is a little overzealous, since we aren't checking whether the store's specific + // directory is nonempty, only if there are any directories for any stores. So if there are + // two stores in a task, and one is correctly written and checkpointed, while the other is + // neither written nor checkpointed, we _could_ correctly load the first and recover the second + // but instead we'll consider the whole task corrupted and discard the first and recover both. + if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: Bugfix: we shouldn't call this task corrupted for not having a checkpoint of a non-persistent store. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -462,7 +468,7 @@ public void flush() { */ @Override public void close() throws ProcessorStateException { - log.debug("Closing its state manager and all the registered state stores: {}", stores); + log.info("Closing its state manager and all the registered state stores: {}", stores); Review comment: ```suggestion log.debug("Closing its state manager and all the registered state stores: {}", stores); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -648,7 +650,7 @@ void runOnce() { // only try to initialize the assigned tasks // if the state is still in PARTITION_ASSIGNED after the poll call - if (state == State.PARTITIONS_ASSIGNED) { + if (state == State.PARTITIONS_ASSIGNED || taskManager.hasPreRunningTasks()) { Review comment: Bugfix: If a task has been revived, then it needs to be initialized and restored, even if the thread state is already RUNNING. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); + if (task.isActive()) { Review comment: Shouldn't try to reset the main consumer at all if this is only a standby. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -768,9 +770,30 @@ void runOnce() { private void resetInvalidOffsets(final InvalidOffsetException e) { final Set<TopicPartition> partitions = e.partitions(); + final Set<TopicPartition> notReset = resetOffsets(partitions); Review comment: Refactored to share the new resetOffsets method. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); + if (task.isActive()) { + // Pause so we won't poll any more records for this task until it has been re-initialized + // Note, closeDirty already clears the partitiongroup for the task. + final Set<TopicPartition> currentAssignment = mainConsumer().assignment(); + final Set<TopicPartition> assignedToPauseAndReset = + Utils.intersection(HashSet::new, currentAssignment, task.inputPartitions()); Review comment: I don't fully understand how this could happen, but I saw it in several tests, that an active task isn't assigned the input TopicPartition for itself. Whether or not it can happen with a real consumer, or only in tests with the MockConsumer, doesn't really seem important. If the topic isn't assigned, we certainly don't need to reset it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org