vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452357020



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -215,7 +215,7 @@ public StateStore getGlobalStore(final String name) {
     }
 
     // package-private for test only
-    void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
+    void initializeStoreOffsetsFromCheckpoint(final boolean taskDirIsEmpty) {

Review comment:
       Was deceptively named.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -566,8 +567,20 @@ public void shouldReviveCorruptTasks() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
         expectLastCall().anyTimes();
 
+        expect(consumer.assignment()).andReturn(taskId00Partitions);
+        consumer.pause(taskId00Partitions);
+        expectLastCall();
+        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+        
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, 
offsetAndMetadata));
+        consumer.seek(t1p0, offsetAndMetadata);
+        expectLastCall();
+        consumer.seekToBeginning(emptySet());
+        expectLastCall();
         replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+        taskManager.setPartitionResetter(tp -> {
+            assertThat(tp, is(empty()));
+            return emptySet();
+        });

Review comment:
       This all amounts to checking that we really reset the consumer to the 
last committed position.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -504,7 +504,7 @@ private void resetOffsetPosition(TopicPartition tp) {
         if (strategy == OffsetResetStrategy.EARLIEST) {
             offset = beginningOffsets.get(tp);
             if (offset == null)
-                throw new IllegalStateException("MockConsumer didn't have 
beginning offset specified, but tried to seek to beginning");
+                throw new IllegalStateException("MockConsumer didn't have 
beginning offset for " + tp + " specified, but tried to seek to beginning");

Review comment:
       Just a quality-of-life improvement.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
                         log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
                                   store.stateStore.name(), store.offset, 
store.changelogPartition);
                     } else {
-                        // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+                        // With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
                         // and hence we are uncertain that the current local 
state only contains committed data;
                         // in that case we need to treat it as a 
task-corrupted exception
-                        if (eosEnabled && !storeDirIsEmpty) {
+
+                        // Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+                        // directory is nonempty, only if there are any 
directories for any stores. So if there are
+                        // two stores in a task, and one is correctly written 
and checkpointed, while the other is
+                        // neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+                        // but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+                        if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
       Bugfix: we shouldn't call this task corrupted for not having a 
checkpoint of a non-persistent store.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -462,7 +468,7 @@ public void flush() {
      */
     @Override
     public void close() throws ProcessorStateException {
-        log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
+        log.info("Closing its state manager and all the registered state 
stores: {}", stores);

Review comment:
       ```suggestion
           log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -648,7 +650,7 @@ void runOnce() {
 
         // only try to initialize the assigned tasks
         // if the state is still in PARTITION_ASSIGNED after the poll call
-        if (state == State.PARTITIONS_ASSIGNED) {
+        if (state == State.PARTITIONS_ASSIGNED || 
taskManager.hasPreRunningTasks()) {

Review comment:
       Bugfix: If a task has been revived, then it needs to be initialized and 
restored, even if the thread state is already RUNNING.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task, 
Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
             task.closeDirty();
+            if (task.isActive()) {

Review comment:
       Shouldn't try to reset the main consumer at all if this is only a 
standby.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -768,9 +770,30 @@ void runOnce() {
 
     private void resetInvalidOffsets(final InvalidOffsetException e) {
         final Set<TopicPartition> partitions = e.partitions();
+        final Set<TopicPartition> notReset = resetOffsets(partitions);

Review comment:
       Refactored to share the new resetOffsets method.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task, 
Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
             task.closeDirty();
+            if (task.isActive()) {
+                // Pause so we won't poll any more records for this task until 
it has been re-initialized
+                // Note, closeDirty already clears the partitiongroup for the 
task.
+                final Set<TopicPartition> currentAssignment = 
mainConsumer().assignment();
+                final Set<TopicPartition> assignedToPauseAndReset =
+                    Utils.intersection(HashSet::new, currentAssignment, 
task.inputPartitions());

Review comment:
       I don't fully understand how this could happen, but I saw it in several 
tests, that an active task isn't assigned the input TopicPartition for itself. 
Whether or not it can happen with a real consumer, or only in tests with the 
MockConsumer, doesn't really seem important. If the topic isn't assigned, we 
certainly don't need to reset it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to