This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 53b4ce5 KAFKA-5998: fix checkpointableOffsets handling (#7030) 53b4ce5 is described below commit 53b4ce5c00d61be87962f603682873665155cec4 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Fri Jul 12 08:42:11 2019 -0500 KAFKA-5998: fix checkpointableOffsets handling (#7030) fix checkpoint file warning by filtering checkpointable offsets per task clean up state manager hierarchy to prevent similar bugs Reviewers: Bruno Cadonna <br...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../internals/GlobalStateManagerImpl.java | 68 ++++--- .../processor/internals/ProcessorStateManager.java | 203 ++++++++++++++------- ...ractStateManager.java => StateManagerUtil.java} | 41 ++--- .../streams/processor/internals/StreamTask.java | 11 +- .../streams/processor/internals/TaskManager.java | 2 +- .../streams/state/internals/OffsetCheckpoint.java | 5 + .../internals/GlobalStateManagerImplTest.java | 8 +- .../processor/internals/MockChangelogReader.java | 7 +- .../internals/ProcessorStateManagerTest.java | 134 +++++++++++++- .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamTaskTest.java | 6 +- .../processor/internals/StreamThreadTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 6 +- 13 files changed, 356 insertions(+), 141 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f224e1e..6a3959f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.FixedOrderMap; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -48,22 +50,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; + /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ -public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager { +public class GlobalStateManagerImpl implements GlobalStateManager { private final Logger log; + private final boolean eosEnabled; private final ProcessorTopology topology; private final Consumer<byte[], byte[]> globalConsumer; + private final File baseDir; private final StateDirectory stateDirectory; private final Set<String> globalStoreNames = new HashSet<>(); + private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>(); private final StateRestoreListener stateRestoreListener; - private InternalProcessorContext processorContext; + private InternalProcessorContext globalProcessorContext; private final int retries; private final long retryBackoffMs; private final Duration pollTime; private final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); + private final OffsetCheckpoint checkpointFile; + private final Map<TopicPartition, Long> checkpointFileCache; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { - super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); + eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); + baseDir = stateDirectory.globalStateDir(); + checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + checkpointFileCache = new HashMap<>(); // Find non persistent store's topics final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); @@ -81,19 +94,19 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob } } - this.log = logContext.logger(GlobalStateManagerImpl.class); + log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; this.stateDirectory = stateDirectory; this.stateRestoreListener = stateRestoreListener; - this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); - this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); - this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + retries = config.getInt(StreamsConfig.RETRIES_CONFIG); + retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); + pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); } @Override - public void setGlobalProcessorContext(final InternalProcessorContext processorContext) { - this.processorContext = processorContext; + public void setGlobalProcessorContext(final InternalProcessorContext globalProcessorContext) { + this.globalProcessorContext = globalProcessorContext; } @Override @@ -103,11 +116,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir)); } } catch (final IOException e) { - throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir)); + throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir), e); } try { - this.checkpointableOffsets.putAll(checkpoint.read()); + checkpointFileCache.putAll(checkpointFile.read()); } catch (final IOException e) { try { stateDirectory.unlockGlobalState(); @@ -120,7 +133,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final List<StateStore> stateStores = topology.globalStateStores(); for (final StateStore stateStore : stateStores) { globalStoreNames.add(stateStore.name()); - stateStore.init(processorContext, stateStore); + stateStore.init(globalProcessorContext, stateStore); } return Collections.unmodifiableSet(globalStoreNames); } @@ -128,12 +141,17 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, final InternalProcessorContext processorContext) { - super.reinitializeStateStoresForPartitions( + StateManagerUtil.reinitializeStateStoresForPartitions( log, + eosEnabled, + baseDir, globalStores, topology.storeToChangelogTopic(), partitions, - processorContext); + processorContext, + checkpointFile, + checkpointFileCache + ); globalConsumer.assign(partitions); globalConsumer.seekToBeginning(partitions); @@ -261,7 +279,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); - final Long checkpoint = checkpointableOffsets.get(topicPartition); + final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); } else { @@ -293,14 +311,14 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.", storeName, recoverableException.toString()); - reinitializeStateStoresForPartitions(recoverableException.partitions(), processorContext); + reinitializeStateStoresForPartitions(recoverableException.partitions(), globalProcessorContext); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); restoreCount = 0L; } } stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount); - checkpointableOffsets.put(topicPartition, offset); + checkpointFileCache.put(topicPartition, offset); } } @@ -313,7 +331,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob try { log.trace("Flushing global store={}", store.name()); store.flush(); - } catch (final Exception e) { + } catch (final RuntimeException e) { throw new ProcessorStateException( String.format("Failed to flush global state store %s", store.name()), e @@ -338,12 +356,12 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob log.debug("Closing global storage engine {}", entry.getKey()); try { entry.getValue().get().close(); - } catch (final Exception e) { + } catch (final RuntimeException e) { log.error("Failed to close global state store {}", entry.getKey(), e); closeFailed.append("Failed to close global state store:") .append(entry.getKey()) .append(". Reason: ") - .append(e.toString()) + .append(e) .append("\n"); } globalStores.put(entry.getKey(), Optional.empty()); @@ -361,12 +379,12 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { - checkpointableOffsets.putAll(offsets); + checkpointFileCache.putAll(offsets); final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); // Skip non persistent store - for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointFileCache.entrySet()) { final String topic = topicPartitionOffset.getKey().topic(); if (!globalNonPersistentStoresTopics.contains(topic)) { filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); @@ -374,15 +392,15 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob } try { - checkpoint.write(filteredOffsets); + checkpointFile.write(filteredOffsets); } catch (final IOException e) { - log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); + log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpointFile, e); } } @Override public Map<TopicPartition, Long> checkpointed() { - return Collections.unmodifiableMap(checkpointableOffsets); + return Collections.unmodifiableMap(checkpointFileCache); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f060d29..7e16abc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -33,15 +33,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableList; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; -public class ProcessorStateManager extends AbstractStateManager { +public class ProcessorStateManager implements StateManager { private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; private final Logger log; @@ -57,16 +62,23 @@ public class ProcessorStateManager extends AbstractStateManager { // must be maintained in topological order private final FixedOrderMap<String, Optional<StateStore>> registeredStores = new FixedOrderMap<>(); + private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>(); private final List<TopicPartition> changelogPartitions = new ArrayList<>(); // TODO: this map does not work with customized grouper where multiple partitions - // of the same topic can be assigned to the same topic. + // of the same topic can be assigned to the same task. private final Map<String, TopicPartition> partitionForTopic; + private final boolean eosEnabled; + private final File baseDir; + private OffsetCheckpoint checkpointFile; + private final Map<TopicPartition, Long> checkpointFileCache = new HashMap<>(); + private final Map<TopicPartition, Long> initialLoadedCheckpoints; + /** * @throws ProcessorStateException if the task directory does not exist and could not be created - * @throws IOException if any severe error happens while creating or locking the state directory + * @throws IOException if any severe error happens while creating or locking the state directory */ public ProcessorStateManager(final TaskId taskId, final Collection<TopicPartition> sources, @@ -76,9 +88,9 @@ public class ProcessorStateManager extends AbstractStateManager { final ChangelogReader changelogReader, final boolean eosEnabled, final LogContext logContext) throws IOException { - super(stateDirectory.directoryForTask(taskId), eosEnabled); + this.eosEnabled = eosEnabled; - this.log = logContext.logger(ProcessorStateManager.class); + log = logContext.logger(ProcessorStateManager.class); this.taskId = taskId; this.changelogReader = changelogReader; logPrefix = String.format("task [%s] ", taskId); @@ -94,15 +106,17 @@ public class ProcessorStateManager extends AbstractStateManager { recordConverters = isStandby ? new HashMap<>() : null; this.storeToChangelogTopic = new HashMap<>(storeToChangelogTopic); - // load the checkpoint information - checkpointableOffsets.putAll(checkpoint.read()); + baseDir = stateDirectory.directoryForTask(taskId); + checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + initialLoadedCheckpoints = checkpointFile.read(); - log.trace("Checkpointable offsets read from checkpoint: {}", checkpointableOffsets); + log.trace("Checkpointable offsets read from checkpoint: {}", initialLoadedCheckpoints); if (eosEnabled) { - // delete the checkpoint file after finish loading its stored offsets - checkpoint.delete(); - checkpoint = null; + // with EOS enabled, there should never be a checkpoint file _during_ processing. + // delete the checkpoint file after loading its stored offsets. + checkpointFile.delete(); + checkpointFile = null; } log.debug("Created state store manager for task {} with the acquired state dir lock", taskId); @@ -126,7 +140,7 @@ public class ProcessorStateManager extends AbstractStateManager { log.debug("Registering state store {} to its state manager", storeName); if (CHECKPOINT_FILE_NAME.equals(storeName)) { - throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME)); + throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, storeName)); } if (registeredStores.containsKey(storeName) && registeredStores.get(storeName).isPresent()) { @@ -135,10 +149,7 @@ public class ProcessorStateManager extends AbstractStateManager { // check that the underlying change log topic exist or not final String topic = storeToChangelogTopic.get(storeName); - if (topic == null) { - registeredStores.put(storeName, Optional.of(store)); - } else { - + if (topic != null) { final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); final RecordConverter recordConverter = converterForStore(store); @@ -149,12 +160,16 @@ public class ProcessorStateManager extends AbstractStateManager { restoreCallbacks.put(topic, stateRestoreCallback); recordConverters.put(topic, recordConverter); } else { - log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition)); + final Long restoreCheckpoint = store.persistent() ? initialLoadedCheckpoints.get(storePartition) : null; + if (restoreCheckpoint != null) { + checkpointFileCache.put(storePartition, restoreCheckpoint); + } + log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, restoreCheckpoint); final StateRestorer restorer = new StateRestorer( storePartition, new CompositeRestoreListener(stateRestoreCallback), - checkpointableOffsets.get(storePartition), + restoreCheckpoint, offsetLimit(storePartition), store.persistent(), storeName, @@ -164,24 +179,38 @@ public class ProcessorStateManager extends AbstractStateManager { changelogReader.register(restorer); } changelogPartitions.add(storePartition); - - registeredStores.put(storeName, Optional.of(store)); } + + registeredStores.put(storeName, Optional.of(store)); } @Override public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, final InternalProcessorContext processorContext) { - reinitializeStateStoresForPartitions( - log, - registeredStores, - storeToChangelogTopic, - partitions, - processorContext); + StateManagerUtil.reinitializeStateStoresForPartitions(log, + eosEnabled, + baseDir, + registeredStores, + storeToChangelogTopic, + partitions, + processorContext, + checkpointFile, + checkpointFileCache + ); + } + + void clearCheckpoints() throws IOException { + if (checkpointFile != null) { + checkpointFile.delete(); + checkpointFile = null; + + checkpointFileCache.clear(); + } } @Override public Map<TopicPartition, Long> checkpointed() { + updateCheckpointFileCache(emptyMap()); final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>(); for (final Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) { @@ -189,7 +218,7 @@ public class ProcessorStateManager extends AbstractStateManager { final int partition = getPartition(topicName); final TopicPartition storePartition = new TopicPartition(topicName, partition); - partitionsAndOffsets.put(storePartition, checkpointableOffsets.getOrDefault(storePartition, -1L)); + partitionsAndOffsets.put(storePartition, checkpointFileCache.getOrDefault(storePartition, -1L)); } return partitionsAndOffsets; } @@ -209,7 +238,7 @@ public class ProcessorStateManager extends AbstractStateManager { try { restoreCallback.restoreBatch(convertedRecords); - } catch (final Exception e) { + } catch (final RuntimeException e) { throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e); } } @@ -246,7 +275,7 @@ public class ProcessorStateManager extends AbstractStateManager { log.trace("Flushing store {}", store.name()); try { store.flush(); - } catch (final Exception e) { + } catch (final RuntimeException e) { if (firstException == null) { firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); } @@ -266,6 +295,7 @@ public class ProcessorStateManager extends AbstractStateManager { /** * {@link StateStore#close() Close} all stores (even in case of failure). * Log all exception and re-throw the first exception that did occur at the end. + * * @throws ProcessorStateException if any error happens when closing the state stores */ @Override @@ -282,7 +312,7 @@ public class ProcessorStateManager extends AbstractStateManager { try { store.close(); registeredStores.put(store.name(), Optional.empty()); - } catch (final Exception e) { + } catch (final RuntimeException e) { if (firstException == null) { firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, store.name()), e); } @@ -294,11 +324,10 @@ public class ProcessorStateManager extends AbstractStateManager { } } - if (!clean && eosEnabled && checkpoint != null) { + if (!clean && eosEnabled) { // delete the checkpoint file if this is an unclean close try { - checkpoint.delete(); - checkpoint = null; + clearCheckpoints(); } catch (final IOException e) { throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e); } @@ -309,44 +338,56 @@ public class ProcessorStateManager extends AbstractStateManager { } } - // write the checkpoint @Override - public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) { - this.checkpointableOffsets.putAll(changelogReader.restoredOffsets()); - log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointableOffsets); - for (final Map.Entry<String, Optional<StateStore>> entry : registeredStores.entrySet()) { - if (entry.getValue().isPresent()) { - final StateStore store = entry.getValue().get(); - final String storeName = store.name(); - // only checkpoint the offset to the offsets file if - // it is persistent AND changelog enabled - if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) { - final String changelogTopic = storeToChangelogTopic.get(storeName); - final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - if (checkpointableOffsets.containsKey(topicPartition)) { - // store the last offset + 1 (the log position after restoration) - this.checkpointableOffsets.put(topicPartition, checkpointableOffsets.get(topicPartition) + 1); - } else if (standbyRestoredOffsets.containsKey(topicPartition)) { - this.checkpointableOffsets.put(topicPartition, standbyRestoredOffsets.get(topicPartition)); - } - } - } else { - throw new IllegalStateException("Expected " + entry.getKey() + " to have been initialized"); - } - } - - log.trace("Checkpointable offsets updated with active acked offsets: {}", this.checkpointableOffsets); + public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsetsFromProcessing) { + ensureStoresRegistered(); // write the checkpoint file before closing - if (checkpoint == null) { - checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + if (checkpointFile == null) { + checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } - log.trace("Writing checkpoint: {}", this.checkpointableOffsets); + updateCheckpointFileCache(checkpointableOffsetsFromProcessing); + + log.trace("Checkpointable offsets updated with active acked offsets: {}", checkpointFileCache); + + log.trace("Writing checkpoint: {}", checkpointFileCache); try { - checkpoint.write(this.checkpointableOffsets); + checkpointFile.write(checkpointFileCache); } catch (final IOException e) { - log.warn("Failed to write offset checkpoint file to [{}]", checkpoint, e); + log.warn("Failed to write offset checkpoint file to [{}]", checkpointFile, e); + } + } + + private void updateCheckpointFileCache(final Map<TopicPartition, Long> checkpointableOffsetsFromProcessing) { + final Set<TopicPartition> validCheckpointableTopics = validCheckpointableTopics(); + final Map<TopicPartition, Long> restoredOffsets = validCheckpointableOffsets( + changelogReader.restoredOffsets(), + validCheckpointableTopics + ); + log.trace("Checkpointable offsets updated with restored offsets: {}", checkpointFileCache); + for (final TopicPartition topicPartition : validCheckpointableTopics) { + if (checkpointableOffsetsFromProcessing.containsKey(topicPartition)) { + // if we have just recently processed some offsets, + // store the last offset + 1 (the log position after restoration) + checkpointFileCache.put(topicPartition, checkpointableOffsetsFromProcessing.get(topicPartition) + 1); + } else if (standbyRestoredOffsets.containsKey(topicPartition)) { + // or if we restored some offset as a standby task, use it + checkpointFileCache.put(topicPartition, standbyRestoredOffsets.get(topicPartition)); + } else if (restoredOffsets.containsKey(topicPartition)) { + // or if we restored some offset as an active task, use it + checkpointFileCache.put(topicPartition, restoredOffsets.get(topicPartition)); + } else if (checkpointFileCache.containsKey(topicPartition)) { + // or if we have a prior value we've cached (and written to the checkpoint file), then keep it + } else { + // As a last resort, fall back to the offset we loaded from the checkpoint file at startup, but + // only if the offset is actually valid for our current state stores. + final Long loadedOffset = + validCheckpointableOffsets(initialLoadedCheckpoints, validCheckpointableTopics).get(topicPartition); + if (loadedOffset != null) { + checkpointFileCache.put(topicPartition, loadedOffset); + } + } } } @@ -380,4 +421,38 @@ public class ProcessorStateManager extends AbstractStateManager { } } } + + private Set<TopicPartition> validCheckpointableTopics() { + // it's only valid to record checkpoints for registered stores that are both persistent and change-logged + + final Set<TopicPartition> result = new HashSet<>(storeToChangelogTopic.size()); + for (final Map.Entry<String, String> storeToChangelog : storeToChangelogTopic.entrySet()) { + final String storeName = storeToChangelog.getKey(); + if (registeredStores.containsKey(storeName) + && registeredStores.get(storeName).isPresent() + && registeredStores.get(storeName).get().persistent()) { + + final String changelogTopic = storeToChangelog.getValue(); + result.add(new TopicPartition(changelogTopic, getPartition(changelogTopic))); + } + } + return result; + } + + private static Map<TopicPartition, Long> validCheckpointableOffsets( + final Map<TopicPartition, Long> checkpointableOffsets, + final Set<TopicPartition> validCheckpointableTopics) { + + final Map<TopicPartition, Long> result = new HashMap<>(checkpointableOffsets.size()); + + for (final Map.Entry<TopicPartition, Long> topicToCheckpointableOffset : checkpointableOffsets.entrySet()) { + final TopicPartition topic = topicToCheckpointableOffset.getKey(); + if (validCheckpointableTopics.contains(topic)) { + final Long checkpointableOffset = topicToCheckpointableOffset.getValue(); + result.put(topic, checkpointableOffset); + } + } + + return result; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java similarity index 78% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 2190c43..4568865 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -38,45 +38,40 @@ import static org.apache.kafka.streams.state.internals.RecordConverters.identity import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue; import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped; -abstract class AbstractStateManager implements StateManager { +final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; - final File baseDir; - final boolean eosEnabled; - OffsetCheckpoint checkpoint; - - final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); - final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>(); - - AbstractStateManager(final File baseDir, - final boolean eosEnabled) { - this.baseDir = baseDir; - this.eosEnabled = eosEnabled; - this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); - } + private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); } - public void reinitializeStateStoresForPartitions(final Logger log, - final FixedOrderMap<String, Optional<StateStore>> stateStores, - final Map<String, String> storeToChangelogTopic, - final Collection<TopicPartition> partitions, - final InternalProcessorContext processorContext) { + public static void reinitializeStateStoresForPartitions(final Logger log, + final boolean eosEnabled, + final File baseDir, + final FixedOrderMap<String, Optional<StateStore>> stateStores, + final Map<String, String> storeToChangelogTopic, + final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext, + final OffsetCheckpoint checkpointFile, + final Map<TopicPartition, Long> checkpointFileCache) { final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic); final Set<String> storesToBeReinitialized = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { - checkpointableOffsets.remove(topicPartition); + checkpointFileCache.remove(topicPartition); storesToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic())); } if (!eosEnabled) { try { - checkpoint.write(checkpointableOffsets); + checkpointFile.write(checkpointFileCache); } catch (final IOException fatalException) { - log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException); + log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", + checkpointFile, + stateStores, + fatalException); throw new StreamsException("Failed to reinitialize global store.", fatalException); } } @@ -122,7 +117,7 @@ abstract class AbstractStateManager implements StateManager { } } - private Map<String, String> inverseOneToOneMap(final Map<String, String> origin) { + private static Map<String, String> inverseOneToOneMap(final Map<String, String> origin) { final Map<String, String> reversedMap = new HashMap<>(); for (final Map.Entry<String, String> entry : origin.entrySet()) { reversedMap.put(entry.getValue(), entry.getKey()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4fd57ba..59d7503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -286,13 +286,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator initializeTransactions(); recordCollector.init(producer); - if (stateMgr.checkpoint != null) { - try { - stateMgr.checkpoint.delete(); - stateMgr.checkpoint = null; - } catch (final IOException e) { - throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e); - } + try { + stateMgr.clearCheckpoints(); + } catch (final IOException e) { + throw new ProcessorStateException(format("%sError while deleting the checkpoint file", logPrefix), e); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 86a8e49..c136fdb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -208,7 +208,7 @@ public class TaskManager { try { final TaskId id = TaskId.parse(dir.getName()); // if the checkpoint file exists, the state is valid. - if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) { + if (new File(dir, StateManagerUtil.CHECKPOINT_FILE_NAME).exists()) { tasks.add(id); } } catch (final TaskIdFormatException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 405831a..93a0561 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -50,6 +52,7 @@ import java.util.regex.Pattern; * separated by spaces. */ public class OffsetCheckpoint { + private static final Logger LOG = LoggerFactory.getLogger(OffsetCheckpoint.class); private static final Pattern WHITESPACE_MINIMUM_ONCE = Pattern.compile("\\s+"); @@ -75,6 +78,7 @@ public class OffsetCheckpoint { synchronized (lock) { // write to temp file and then swap with the existing file final File temp = new File(file.getAbsolutePath() + ".tmp"); + LOG.trace("Writing tmp checkpoint file {}", temp.getAbsolutePath()); final FileOutputStream fileOutputStream = new FileOutputStream(temp); try (final BufferedWriter writer = new BufferedWriter( @@ -90,6 +94,7 @@ public class OffsetCheckpoint { fileOutputStream.getFD().sync(); } + LOG.trace("Swapping tmp checkpoint file {} {}", temp.toPath(), file.toPath()); Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 84b11fa..8e3eb39 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -137,7 +137,7 @@ public class GlobalStateManagerImplTest { streamsConfig); processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig); stateManager.setGlobalProcessorContext(processorContext); - checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); + checkpointFile = new File(stateManager.baseDir(), StateManagerUtil.CHECKPOINT_FILE_NAME); } @After @@ -336,7 +336,7 @@ public class GlobalStateManagerImplTest { initializeConsumer(5, 5, t1); final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - ProcessorStateManager.CHECKPOINT_FILE_NAME)); + StateManagerUtil.CHECKPOINT_FILE_NAME)); offsetCheckpoint.write(Collections.singletonMap(t1, 5L)); stateManager.initialize(); @@ -560,7 +560,7 @@ public class GlobalStateManagerImplTest { private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - ProcessorStateManager.CHECKPOINT_FILE_NAME)); + StateManagerUtil.CHECKPOINT_FILE_NAME)); return offsetCheckpoint.read(); } @@ -717,7 +717,7 @@ public class GlobalStateManagerImplTest { } private void writeCorruptCheckpoint() throws IOException { - final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); + final File checkpointFile = new File(stateManager.baseDir(), StateManagerUtil.CHECKPOINT_FILE_NAME); try (final OutputStream stream = Files.newOutputStream(checkpointFile.toPath())) { stream.write("0\n1\nfoo".getBytes()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java index 6c3be61..1330967 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -26,6 +26,7 @@ import java.util.Set; public class MockChangelogReader implements ChangelogReader { private final Set<TopicPartition> registered = new HashSet<>(); + private Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap(); @Override public void register(final StateRestorer restorer) { @@ -39,7 +40,11 @@ public class MockChangelogReader implements ChangelogReader { @Override public Map<TopicPartition, Long> restoredOffsets() { - return Collections.emptyMap(); + return restoredOffsets; + } + + void setRestoredOffsets(final Map<TopicPartition, Long> restoredOffsets) { + this.restoredOffsets = restoredOffsets; } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 5c26dd2..7b94454 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -107,7 +107,7 @@ public class ProcessorStateManagerTest { put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath()); } }), new MockTime(), true); - checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); + checkpointFile = new File(stateDirectory.directoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME); checkpoint = new OffsetCheckpoint(checkpointFile); } @@ -240,7 +240,7 @@ public class ProcessorStateManagerTest { @Test public void testChangeLogOffsets() throws IOException { final TaskId taskId = new TaskId(0, 0); - final long lastCheckpointedOffset = 10L; + final long storeTopic1LoadedCheckpoint = 10L; final String storeName1 = "store1"; final String storeName2 = "store2"; final String storeName3 = "store3"; @@ -254,8 +254,10 @@ public class ProcessorStateManagerTest { storeToChangelogTopic.put(storeName2, storeTopicName2); storeToChangelogTopic.put(storeName3, storeTopicName3); - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint( + new File(stateDirectory.directoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME) + ); + checkpoint.write(singletonMap(new TopicPartition(storeTopicName1, 0), storeTopic1LoadedCheckpoint)); final TopicPartition partition1 = new TopicPartition(storeTopicName1, 0); final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0); @@ -289,7 +291,7 @@ public class ProcessorStateManagerTest { assertTrue(changeLogOffsets.containsKey(partition1)); assertTrue(changeLogOffsets.containsKey(partition2)); assertTrue(changeLogOffsets.containsKey(partition3)); - assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1)); + assertEquals(storeTopic1LoadedCheckpoint, (long) changeLogOffsets.get(partition1)); assertEquals(-1L, (long) changeLogOffsets.get(partition2)); assertEquals(-1L, (long) changeLogOffsets.get(partition3)); @@ -365,8 +367,7 @@ public class ProcessorStateManagerTest { // the checkpoint file should contain an offset from the persistent store only. final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read(); - assertEquals(1, checkpointedOffsets.size()); - assertEquals(new Long(124), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); + assertThat(checkpointedOffsets, is(singletonMap(new TopicPartition(persistentStoreTopicName, 1), 124L))); } @Test @@ -442,6 +443,123 @@ public class ProcessorStateManagerTest { } @Test + public void shouldIgnoreIrrelevantLoadedCheckpoints() throws IOException { + final Map<TopicPartition, Long> offsets = mkMap( + mkEntry(persistentStorePartition, 99L), + mkEntry(new TopicPartition("ignoreme", 1234), 12L) + ); + checkpoint.write(offsets); + + final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + singletonMap(persistentStoreName, persistentStorePartition.topic()), + changelogReader, + false, + logContext); + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + + changelogReader.setRestoredOffsets(singletonMap(persistentStorePartition, 110L)); + + stateMgr.checkpoint(emptyMap()); + stateMgr.close(true); + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(singletonMap(persistentStorePartition, 110L))); + } + + @Test + public void shouldOverrideLoadedCheckpointsWithRestoredCheckpoints() throws IOException { + final Map<TopicPartition, Long> offsets = singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); + + final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + singletonMap(persistentStoreName, persistentStorePartition.topic()), + changelogReader, + false, + logContext); + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + + changelogReader.setRestoredOffsets(singletonMap(persistentStorePartition, 110L)); + + stateMgr.checkpoint(emptyMap()); + stateMgr.close(true); + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(singletonMap(persistentStorePartition, 110L))); + } + + @Test + public void shouldIgnoreIrrelevantRestoredCheckpoints() throws IOException { + final Map<TopicPartition, Long> offsets = singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); + + final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + singletonMap(persistentStoreName, persistentStorePartition.topic()), + changelogReader, + false, + logContext); + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + + // should ignore irrelevant topic partitions + changelogReader.setRestoredOffsets(mkMap( + mkEntry(persistentStorePartition, 110L), + mkEntry(new TopicPartition("sillytopic", 5000), 1234L) + )); + + stateMgr.checkpoint(emptyMap()); + stateMgr.close(true); + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(singletonMap(persistentStorePartition, 110L))); + } + + @Test + public void shouldOverrideRestoredOffsetsWithProcessedOffsets() throws IOException { + final Map<TopicPartition, Long> offsets = singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); + + final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + singletonMap(persistentStoreName, persistentStorePartition.topic()), + changelogReader, + false, + logContext); + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + + // should ignore irrelevant topic partitions + changelogReader.setRestoredOffsets(mkMap( + mkEntry(persistentStorePartition, 110L), + mkEntry(new TopicPartition("sillytopic", 5000), 1234L) + )); + + // should ignore irrelevant topic partitions + stateMgr.checkpoint(mkMap( + mkEntry(persistentStorePartition, 220L), + mkEntry(new TopicPartition("ignoreme", 42), 9000L) + )); + stateMgr.close(true); + final Map<TopicPartition, Long> read = checkpoint.read(); + + // the checkpoint gets incremented to be the log position _after_ the committed offset + assertThat(read, equalTo(singletonMap(persistentStorePartition, 221L))); + } + + @Test public void shouldWriteCheckpointForPersistentLogEnabledStore() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, @@ -540,7 +658,7 @@ public class ProcessorStateManagerTest { logContext); try { - stateManager.register(new MockKeyValueStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null); + stateManager.register(new MockKeyValueStore(StateManagerUtil.CHECKPOINT_FILE_NAME, true), null); fail("should have thrown illegal argument exception when store name same as checkpoint file"); } catch (final IllegalArgumentException e) { //pass diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 0e50120..0400128 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -443,7 +443,7 @@ public class StandbyTaskTest { task.close(true, false); final File taskDir = stateDirectory.directoryForTask(taskId); - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, StateManagerUtil.CHECKPOINT_FILE_NAME)); final Map<TopicPartition, Long> offsets = checkpoint.read(); assertEquals(1, offsets.size()); @@ -637,7 +637,7 @@ public class StandbyTaskTest { task.commit(); final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint( - new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME) + new File(stateDirectory.directoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME) ).read(); assertThat(checkpoint, equalTo(Collections.singletonMap(globalTopicPartition, 51L))); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index ab9a47e..a70c10c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -861,7 +861,7 @@ public class StreamTaskTest { task.initializeTopology(); task.commit(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint( - new File(stateDirectory.directoryForTask(taskId00), ProcessorStateManager.CHECKPOINT_FILE_NAME) + new File(stateDirectory.directoryForTask(taskId00), StateManagerUtil.CHECKPOINT_FILE_NAME) ); assertThat(checkpoint.read(), equalTo(Collections.singletonMap(changelogPartition, offset))); @@ -875,7 +875,7 @@ public class StreamTaskTest { task.commit(); final File checkpointFile = new File( stateDirectory.directoryForTask(taskId00), - ProcessorStateManager.CHECKPOINT_FILE_NAME + StateManagerUtil.CHECKPOINT_FILE_NAME ); assertFalse(checkpointFile.exists()); @@ -1477,6 +1477,8 @@ public class StreamTaskTest { } private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) { + final StateStore stateStore = new MockKeyValueStore(storeName, logged); + final ProcessorTopology topology = ProcessorTopologyFactories.with( asList(source1, source2), mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7c17ca7..d1d6671 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -89,7 +89,7 @@ import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index fcf275b..7d7d4e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -218,9 +218,9 @@ public class TaskManagerTest { testFolder.newFolder("1_1"), testFolder.newFolder("dummy")).toArray(new File[0]); - assertTrue((new File(taskFolders[0], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); - assertTrue((new File(taskFolders[1], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); - assertTrue((new File(taskFolders[3], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); + assertTrue((new File(taskFolders[0], StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile()); + assertTrue((new File(taskFolders[1], StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile()); + assertTrue((new File(taskFolders[3], StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile()); expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once(); expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();