bbejeck commented on code in PR #21738:
URL: https://github.com/apache/kafka/pull/21738#discussion_r2935999817
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -254,70 +269,49 @@ public StateStore globalStore(final String name) {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
- try {
- final Map<TopicPartition, Long> loadedCheckpoints =
checkpointFile.read();
-
- log.trace("Loaded offsets from the checkpoint file: {}",
loadedCheckpoints);
-
- for (final StateStoreMetadata store : stores.values()) {
- if (store.corrupted) {
- log.error("Tried to initialize store offsets for corrupted
store {}", store);
- throw new IllegalStateException("Should not initialize
offsets for a corrupted task");
- }
+ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+ for (final StateStoreMetadata store : stores.values()) {
+ if (store.corrupted) {
+ log.error("Tried to initialize store offsets for corrupted
store {}", store);
+ throw new ProcessorStateException(
+ "Error initializing offsets for store '" + store + "'",
+ new IllegalStateException("Should not initialize
offsets for a corrupted task")
+ );
+ }
- if (store.changelogPartition == null) {
- log.info("State store {} is not logged and hence would not
be restored", store.stateStore.name());
- } else if (!store.stateStore.persistent()) {
- log.info("Initializing to the starting offset for
changelog {} of in-memory state store {}",
- store.changelogPartition,
store.stateStore.name());
- } else if (store.offset() == null) {
- if
(loadedCheckpoints.containsKey(store.changelogPartition)) {
- final Long offset =
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
- store.setOffset(offset);
-
- log.info("State store {} initialized from checkpoint
with offset {} at changelog {}",
- store.stateStore.name(), store.offset,
store.changelogPartition);
- } else {
- // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
- // and hence we are uncertain that the current local
state only contains committed data;
- // in that case we need to treat it as a
task-corrupted exception
- if (eosEnabled && !storeDirIsEmpty) {
- log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
+ if (store.changelogPartition == null) {
+ log.info("State store {} is not logged and hence would not be
restored", store.stateStore.name());
+ } else if (!store.stateStore.persistent()) {
+ log.info("Initializing to the starting offset for changelog {}
of in-memory state store {}",
+ store.changelogPartition, store.stateStore.name());
+ } else if (store.offset() == null) {
+ final Long offset =
store.stateStore.committedOffset(store.changelogPartition);
+
+ if (offset != null) {
+ store.setOffset(offset);
+ log.info("State store {} initialized from checkpoint with
offset {} at changelog {}",
+ store.stateStore.name(), store.offset,
store.changelogPartition);
+ } else {
+ // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
+ // and hence we are uncertain that the current local state
only contains committed data;
+ // in that case we need to treat it as a task-corrupted
exception
+ if (eosEnabled && !storeDirIsEmpty) {
+ log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
"since under EOS it has the risk of getting
uncommitted data in stores we have to " +
"treat it as a task corruption error and wipe
out the local state of task {} " +
"before re-bootstrapping",
store.stateStore.name(), taskId);
- throw new
TaskCorruptedException(Collections.singleton(taskId));
- } else {
- log.info("State store {} did not find checkpoint
offset, hence would " +
- "default to the starting offset at changelog
{}",
+ throw new
TaskCorruptedException(Collections.singleton(taskId));
+ } else {
+ log.info("State store {} did not find checkpoint
offset, hence would " +
+ "default to the starting offset at
changelog {}",
store.stateStore.name(),
store.changelogPartition);
- }
}
- } else {
- loadedCheckpoints.remove(store.changelogPartition);
- log.debug("Skipping re-initialization of offset from
checkpoint for recycled store {}",
- store.stateStore.name());
}
}
-
- stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
-
- if (!loadedCheckpoints.isEmpty()) {
- log.warn("Some loaded checkpoint offsets cannot find their
corresponding state stores: {}", loadedCheckpoints);
- }
-
- if (eosEnabled) {
- checkpointFile.delete();
- }
- } catch (final TaskCorruptedException e) {
- throw e;
- } catch (final IOException | RuntimeException e) {
- // both IOException or runtime exception like number parsing can
throw
- throw new ProcessorStateException(format("%sError loading and
deleting checkpoint file when creating the state manager",
- logPrefix), e);
}
+
+ stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
Review Comment:
This used to be withing the `try/catch` block but now if there's an error it
will bubble up and possibly escape handling since it's no longer going throw
either `TaskCorruptedException` or `ProcessorStateException`
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -1040,36 +1032,6 @@ public void
shouldBeAbleToCloseWithoutRegisteringAnyStores() {
stateMgr.close();
}
- @Test
- public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
Review Comment:
I'm wondering if we should keep this test for now, won't we have users
running in the pre-txn statestore mode for a while?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -254,70 +269,49 @@ public StateStore globalStore(final String name) {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
- try {
- final Map<TopicPartition, Long> loadedCheckpoints =
checkpointFile.read();
-
- log.trace("Loaded offsets from the checkpoint file: {}",
loadedCheckpoints);
-
- for (final StateStoreMetadata store : stores.values()) {
- if (store.corrupted) {
- log.error("Tried to initialize store offsets for corrupted
store {}", store);
- throw new IllegalStateException("Should not initialize
offsets for a corrupted task");
- }
+ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+ for (final StateStoreMetadata store : stores.values()) {
+ if (store.corrupted) {
+ log.error("Tried to initialize store offsets for corrupted
store {}", store);
+ throw new ProcessorStateException(
Review Comment:
This is correct, but is there a chance this could lead to a behavior change
since the previous code only threw `IllegalStateException` meaning this might
land in a `catch` block it didn't before?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -444,7 +437,7 @@ boolean directoryForTaskIsEmpty(final TaskId taskId) {
private boolean taskDirIsEmpty(final File taskDir) {
final File[] storeDirs = taskDir.listFiles(pathname ->
- !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+
!pathname.getName().equals(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME));
Review Comment:
I think this needs to be updated as it looks like the filter is going miss
the new checkpoint file names of `checkpoint_<store name>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]