vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452357020
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -215,7 +215,7 @@ public StateStore getGlobalStore(final String name) {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
+ void initializeStoreOffsetsFromCheckpoint(final boolean taskDirIsEmpty) {
Review comment:
Was deceptively named.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -566,8 +567,20 @@ public void shouldReviveCorruptTasks() {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(),
anyString());
expectLastCall().anyTimes();
+ expect(consumer.assignment()).andReturn(taskId00Partitions);
+ consumer.pause(taskId00Partitions);
+ expectLastCall();
+ final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0,
offsetAndMetadata));
+ consumer.seek(t1p0, offsetAndMetadata);
+ expectLastCall();
+ consumer.seekToBeginning(emptySet());
+ expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+ taskManager.setPartitionResetter(tp -> {
+ assertThat(tp, is(empty()));
+ return emptySet();
+ });
Review comment:
This all amounts to checking that we really reset the consumer to the
last committed position.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -504,7 +504,7 @@ private void resetOffsetPosition(TopicPartition tp) {
if (strategy == OffsetResetStrategy.EARLIEST) {
offset = beginningOffsets.get(tp);
if (offset == null)
- throw new IllegalStateException("MockConsumer didn't have
beginning offset specified, but tried to seek to beginning");
+ throw new IllegalStateException("MockConsumer didn't have
beginning offset for " + tp + " specified, but tried to seek to beginning");
Review comment:
Just a quality-of-life improvement.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean
storeDirIsEmpty) {
log.debug("State store {} initialized from checkpoint
with offset {} at changelog {}",
store.stateStore.name(), store.offset,
store.changelogPartition);
} else {
- // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
+ // With EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
// and hence we are uncertain that the current local
state only contains committed data;
// in that case we need to treat it as a
task-corrupted exception
- if (eosEnabled && !storeDirIsEmpty) {
+
+ // Note, this is a little overzealous, since we aren't
checking whether the store's specific
+ // directory is nonempty, only if there are any
directories for any stores. So if there are
+ // two stores in a task, and one is correctly written
and checkpointed, while the other is
+ // neither written nor checkpointed, we _could_
correctly load the first and recover the second
+ // but instead we'll consider the whole task corrupted
and discard the first and recover both.
+ if (store.stateStore.persistent() && eosEnabled &&
!taskDirIsEmpty) {
Review comment:
Bugfix: we shouldn't call this task corrupted for not having a
checkpoint of a non-persistent store.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -462,7 +468,7 @@ public void flush() {
*/
@Override
public void close() throws ProcessorStateException {
- log.debug("Closing its state manager and all the registered state
stores: {}", stores);
+ log.info("Closing its state manager and all the registered state
stores: {}", stores);
Review comment:
```suggestion
log.debug("Closing its state manager and all the registered state
stores: {}", stores);
```
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -648,7 +650,7 @@ void runOnce() {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
- if (state == State.PARTITIONS_ASSIGNED) {
+ if (state == State.PARTITIONS_ASSIGNED ||
taskManager.hasPreRunningTasks()) {
Review comment:
Bugfix: If a task has been revived, then it needs to be initialized and
restored, even if the thread state is already RUNNING.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task,
Collection<TopicPartition>> taskWith
log.error("Error suspending corrupted task {} ", task.id(),
swallow);
}
task.closeDirty();
+ if (task.isActive()) {
Review comment:
Shouldn't try to reset the main consumer at all if this is only a
standby.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -768,9 +770,30 @@ void runOnce() {
private void resetInvalidOffsets(final InvalidOffsetException e) {
final Set<TopicPartition> partitions = e.partitions();
+ final Set<TopicPartition> notReset = resetOffsets(partitions);
Review comment:
Refactored to share the new resetOffsets method.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map<Task,
Collection<TopicPartition>> taskWith
log.error("Error suspending corrupted task {} ", task.id(),
swallow);
}
task.closeDirty();
+ if (task.isActive()) {
+ // Pause so we won't poll any more records for this task until
it has been re-initialized
+ // Note, closeDirty already clears the partitiongroup for the
task.
+ final Set<TopicPartition> currentAssignment =
mainConsumer().assignment();
+ final Set<TopicPartition> assignedToPauseAndReset =
+ Utils.intersection(HashSet::new, currentAssignment,
task.inputPartitions());
Review comment:
I don't fully understand how this could happen, but I saw it in several
tests, that an active task isn't assigned the input TopicPartition for itself.
Whether or not it can happen with a real consumer, or only in tests with the
MockConsumer, doesn't really seem important. If the topic isn't assigned, we
certainly don't need to reset it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]