nicktelford commented on code in PR #21554:
URL: https://github.com/apache/kafka/pull/21554#discussion_r2871190828
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java:
##########
@@ -0,0 +1,218 @@
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+
+public class LegacyCheckpointingStateStore<S extends StateStore, K, V> extends
WrappedStateStore<S, K, V> {
+
+ private static final Logger log =
LoggerFactory.getLogger(LegacyCheckpointingStateStore.class);
+
+ static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+ static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
+
+ private final boolean eosEnabled;
+ private final Set<TopicPartition> changelogPartitions;
+ private final StateDirectory stateDirectory;
+ private final TaskId taskId;
+ private final OffsetCheckpoint checkpointFile;
+ private final String logPrefix;
+
+ private final Map<TopicPartition, Long> offsets = new HashMap<>();
+ private Map<TopicPartition, Long> checkpointedOffsets;
+
+ /**
+ * Wraps the given {@link StateStore} as a {@code
LegacyCheckpointingStateStore}, only if it is both
+ * {@link StateStore#persistent() persistent}, and it does not {@link
StateStore#managesOffsets() manage its own offsets}.
+ */
+ @SuppressWarnings("deprecation")
+ public static <S extends StateStore, K, V> StateStore maybeWrapStore(final
S wrapped,
+ final
boolean eosEnabled,
+ final
Set<TopicPartition> changelogPartitions,
+ final
StateDirectory stateDirectory,
+ final
TaskId taskId,
+ final
String logPrefix) {
+ return wrapped.persistent() && !wrapped.managesOffsets()
+ ? new LegacyCheckpointingStateStore<>(wrapped, eosEnabled,
changelogPartitions, stateDirectory, taskId, logPrefix)
+ : wrapped;
+ }
+
+ /**
+ * Unwraps the given store, only if it is a {@code
LegacyCheckpointingStateStore}.
+ */
+ public static StateStore maybeUnwrapStore(final StateStore store) {
+ return (store instanceof LegacyCheckpointingStateStore<?, ?, ?>)
+ ? ((LegacyCheckpointingStateStore<?, ?, ?>) store).wrapped()
+ : store;
+ }
+
+ /**
+ * Runs post-initialization for {@code LegacyCheckpointingStore}, only if
the {@code store} is one.
+ *
+ * This must be run after <em>ALL</em> stores have been initialized, as
it's possible it may delete a shared
+ * checkpoint file, which is needed during initialization.
+ */
+ public static void maybeCleanupCheckpointFile(final Iterable<StateStore>
stores) {
+ for (final StateStore store : stores) {
+ if (store instanceof LegacyCheckpointingStateStore) {
+ final LegacyCheckpointingStateStore<?, ?, ?> wrappedStore =
((LegacyCheckpointingStateStore<?, ?, ?>) store);
+ try {
+ if (wrappedStore.eosEnabled) {
+ wrappedStore.checkpointFile.delete();
+ }
+ } catch (final IOException e) {
+ throw new ProcessorStateException(String.format("%sError
deleting checkpoint file when creating StateStore '%s'",
wrappedStore.logPrefix, store.name()), e);
+ }
+ }
+ }
+ }
+
+ LegacyCheckpointingStateStore(final S wrapped,
+ final boolean eosEnabled,
+ final Set<TopicPartition>
changelogPartitions,
+ final StateDirectory stateDirectory,
+ final TaskId taskId,
+ final String logPrefix) {
+ super(wrapped);
+ this.eosEnabled = eosEnabled;
+ this.changelogPartitions = changelogPartitions;
+ this.stateDirectory = stateDirectory;
+ this.taskId = taskId;
+ this.checkpointFile = new OffsetCheckpoint(checkpointFileFor(taskId));
+ this.logPrefix = logPrefix;
+
+ // fail-crash; in this case we would not need to immediately close the
state store before throwing
+ if (CHECKPOINT_FILE_NAME.equals(wrapped.name())) {
+ wrapped.close();
+ throw new IllegalArgumentException(String.format("%sIllegal store
name: %s, which collides with the pre-defined " +
+ "checkpoint file name", logPrefix, wrapped.name()));
+ }
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ // load store offsets from checkpoint file
+ try {
+ final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
+ for (final Map.Entry<TopicPartition, Long> entry :
allOffsets.entrySet()) {
+ if (changelogPartitions.contains(entry.getKey())) {
+ offsets.put(entry.getKey(),
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+ }
+ }
+ checkpointedOffsets = new HashMap<>(offsets);
+ } catch (final IOException e) {
+ throw new ProcessorStateException(String.format("%sError loading
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+ }
+
+ // initialize the actual store
+ super.init(stateStoreContext, root);
+ }
+
+ @Override
+ @Deprecated
+ public boolean managesOffsets() {
+ return true;
Review Comment:
@bbejeck The idea is that this implementation is used to take any
"non-managing" StateStore (i.e. one where `managesOffsets() == false`, and turn
it into a "managing" StateStore (i.e. one where `managesOffsets() == true`).
It does this by handling offset management via `.checkpoint` files.
Consequently, since any store wrapped in this implementation knows how to
manage its offsets, it's semantically correct for `managesOffsets()` to return
`true`.
The `managesOffsets()` method is only ever called by the `StateManager`s to
determine whether to wrap the implementation in a
`LegacyCheckpointingStateStore` anyway, so returning `true` here functions
mostly as a safety mechanism, to prevent stores from being double-wrapped
(although that's probably impossible anyway).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]