This is an automated email from the ASF dual-hosted git repository. pmaheshwari pushed a commit to branch 1.3.0 in repository https://gitbox.apache.org/repos/asf/samza.git
commit cb2707f4151cfa5e58eddc46ffc38cb633397349 Author: Prateek Maheshwari <[email protected]> AuthorDate: Tue Oct 22 17:55:54 2019 -0700 SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196) --- .../org/apache/samza/checkpoint/CheckpointId.java | 82 +++++++ .../checkpoint/CheckpointedChangelogOffset.java | 82 +++++++ .../org/apache/samza/storage/StorageEngine.java | 5 +- .../org/apache/samza/storage/kv/KeyValueStore.java | 5 +- .../java/org/apache/samza/config/TaskConfig.java | 8 +- .../operators/util/InternalInMemoryStore.java | 3 +- .../TransactionalStateTaskRestoreManager.java | 62 ++++-- .../org/apache/samza/container/TaskInstance.scala | 23 +- .../NonTransactionalStateTaskStorageManager.scala | 8 +- .../apache/samza/storage/TaskStorageManager.scala | 5 +- .../TransactionalStateTaskStorageManager.scala | 12 +- .../operators/impl/store/TestInMemoryStore.java | 4 +- .../apache/samza/storage/MockStorageEngine.java | 3 +- .../TestTransactionalStateTaskRestoreManager.java | 241 +++++++++++++++++++-- .../apache/samza/container/TestTaskInstance.scala | 22 +- .../samza/storage/TestContainerStorageManager.java | 2 +- .../TestTransactionalStateTaskStorageManager.java | 20 +- .../kv/inmemory/InMemoryKeyValueStore.scala | 4 +- .../samza/storage/kv/RocksDbKeyValueStore.scala | 12 +- .../samza/storage/kv/LargeMessageSafeStore.java | 3 +- .../samza/storage/kv/AccessLoggedStore.scala | 3 +- .../org/apache/samza/storage/kv/CachedStore.scala | 5 +- .../samza/storage/kv/KeyValueStorageEngine.scala | 4 +- .../org/apache/samza/storage/kv/LoggedStore.scala | 3 +- .../samza/storage/kv/NullSafeKeyValueStore.scala | 4 +- .../samza/storage/kv/SerializedKeyValueStore.scala | 4 +- .../samza/storage/kv/MockKeyValueStore.scala | 4 +- .../kv/TransactionalStateIntegrationTest.java | 2 +- ...ransactionalStateMultiStoreIntegrationTest.java | 2 +- 29 files changed, 528 insertions(+), 109 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java new file mode 100644 index 0000000..95dfd24 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.checkpoint; + +import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.annotation.InterfaceStability; + +/** + * Checkpoint ID has the format: [currentTimeMillis, last 6 digits of nanotime], separated by a dash. + * This is to avoid conflicts, e.g when requesting frequent manual commits. + * + * It is expected that persistent stores use the {@link #toString()} representation of the checkpoint id + * as the store checkpoint directory name. + */ [email protected] +public class CheckpointId { + public static final String SEPARATOR = "-"; + + private final long millis; + private final long nanos; + + public CheckpointId(long millis, long nanos) { + this.millis = millis; + this.nanos = nanos; + } + + public static CheckpointId create() { + return new CheckpointId(System.currentTimeMillis(), System.nanoTime() % 1000000); + } + + public static CheckpointId fromString(String checkpointId) { + if (StringUtils.isBlank(checkpointId)) { + throw new IllegalArgumentException("Invalid checkpoint id: " + checkpointId); + } + String[] parts = checkpointId.split(SEPARATOR); + return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1])); + } + + public long getMillis() { + return millis; + } + + public long getNanos() { + return nanos; + } + + @Override + public String toString() { + return String.format("%s%s%s", millis, SEPARATOR, nanos); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CheckpointId that = (CheckpointId) o; + return millis == that.millis && + nanos == that.nanos; + } + + @Override + public int hashCode() { + return Objects.hash(millis, nanos); + } +} \ No newline at end of file diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java new file mode 100644 index 0000000..407ce7a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.checkpoint; + +import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.annotation.InterfaceStability; + +/** + * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon. + */ [email protected] +public class CheckpointedChangelogOffset { + public static final String SEPARATOR = ":"; + + private final CheckpointId checkpointId; + private final String offset; + + public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) { + this.checkpointId = checkpointId; + this.offset = offset; + } + + public static CheckpointedChangelogOffset fromString(String message) { + if (StringUtils.isBlank(message)) { + throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message); + } + String[] checkpointIdAndOffset = message.split(":"); + if (checkpointIdAndOffset.length != 2) { + throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message); + } + CheckpointId checkpointId = CheckpointId.fromString(checkpointIdAndOffset[0]); + String offset = null; + if (!"null".equals(checkpointIdAndOffset[1])) { + offset = checkpointIdAndOffset[1]; + } + return new CheckpointedChangelogOffset(checkpointId, offset); + } + + public CheckpointId getCheckpointId() { + return checkpointId; + } + + public String getOffset() { + return offset; + } + + @Override + public String toString() { + return String.format("%s%s%s", checkpointId, SEPARATOR, offset); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CheckpointedChangelogOffset that = (CheckpointedChangelogOffset) o; + return Objects.equals(checkpointId, that.checkpointId) && + Objects.equals(offset, that.offset); + } + + @Override + public int hashCode() { + return Objects.hash(checkpointId, offset); + } +} diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java index 804a250..8add1de 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java @@ -22,6 +22,8 @@ package org.apache.samza.storage; import java.nio.file.Path; import java.util.Optional; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.system.ChangelogSSPIterator; /** @@ -55,7 +57,8 @@ public interface StorageEngine { /** * Checkpoint store snapshots. */ - Optional<Path> checkpoint(String id); + @InterfaceStability.Unstable + Optional<Path> checkpoint(CheckpointId id); /** * Close the storage engine diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index d262e29..41faac3 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.checkpoint.CheckpointId; /** @@ -150,6 +151,6 @@ public interface KeyValueStore<K, V> { * Create a persistent checkpoint / snapshot of the current store state and return it's path. * @return the path of the persistent store checkpoint, or an empty optional if checkpoints are not supported. */ - @InterfaceStability.Evolving - Optional<Path> checkpoint(String id); + @InterfaceStability.Unstable + Optional<Path> checkpoint(CheckpointId id); } diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 22e0fa9..b02f6c9 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -110,8 +110,8 @@ public class TaskConfig extends MapConfig { private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true; public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled"; private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false; - public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE = - "task.transactional.state.retain.existing.changelog.state"; + public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = + "task.transactional.state.retain.existing.state"; private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true; public TaskConfig(Config config) { @@ -313,7 +313,7 @@ public class TaskConfig extends MapConfig { return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED); } - public boolean getTransactionalStateRetainExistingChangelogState() { - return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE); + public boolean getTransactionalStateRetainExistingState() { + return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE); } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java index 2ad25eb..6a5ebf8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.util; +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueSnapshot; import org.apache.samza.storage.kv.KeyValueIterator; @@ -140,7 +141,7 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override - public Optional<Path> checkpoint(String id) { + public Optional<Path> checkpoint(CheckpointId id) { return Optional.empty(); } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 4dd7f59..1e54ea1 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -31,9 +31,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.CheckpointedChangelogOffset; import org.apache.samza.config.Config; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.TaskMode; @@ -230,8 +233,16 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP); String oldestOffset = changelogSSPMetadata.getOldestOffset(); String newestOffset = changelogSSPMetadata.getNewestOffset(); - String checkpointedOffset = checkpointedChangelogOffsets.get(changelogSSP); + String checkpointMessage = checkpointedChangelogOffsets.get(changelogSSP); + String checkpointedOffset = null; // can be null if no message, or message has null offset + long timeSinceLastCheckpointInMs = Long.MAX_VALUE; + if (StringUtils.isNotBlank(checkpointMessage)) { + CheckpointedChangelogOffset checkpointedChangelogOffset = CheckpointedChangelogOffset.fromString(checkpointMessage); + checkpointedOffset = checkpointedChangelogOffset.getOffset(); + timeSinceLastCheckpointInMs = System.currentTimeMillis() - + checkpointedChangelogOffset.getCheckpointId().getMillis(); + } Optional<File> currentDirOptional; Optional<List<File>> checkpointDirsOptional; @@ -255,21 +266,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager storeDirsToDelete.put(storeName, currentDir); }); - // first check if checkpointed offset is invalid (i.e., out of range of current offsets, or null) if (checkpointedOffset == null && oldestOffset != null) { // this can mean that either this is the initial migration for this feature and there are no previously // checkpointed changelog offsets, or that this is a new store or changelog topic after the initial migration. // if this is the first time migration, it might be desirable to retain existing data. - // if this is new store or topic, it's possible that the container previously died after writing some data to - // the changelog but before a commit, so it's desirable to delete the store, not restore anything and + // if this is new store or topic, it is possible that the container previously died after writing some data to + // the changelog but before a commit, so it is desirable to delete the store, not restore anything and // trim the changelog - // since we can't easily tell the difference b/w the two scenarios by just looking at the store and changelogs, + // since we can't tell the difference b/w the two scenarios by just looking at the store and changelogs, // we'll request users to indicate whether to retain existing data using a config flag. this flag should only // be set during migrations, and turned off after the first successful commit of the new container (i.e. next // deploy). for simplicity, we'll always delete the local store, and restore from changelog if necessary. + // the former scenario should not be common. the recommended way to opt-in to the transactional state feature + // is to first upgrade to the latest samza version but keep the transactional state restore config off. + // this will create the store checkpoint directories and write the changelog offset to the checkpoint, but + // will not use them during restore. once this is done (i.e. at least one commit after upgrade), the + // transactional state restore feature can be turned on on subsequent deploys. this code path exists as a + // fail-safe against clearing changelogs in case users do not follow upgrade instructions and enable the + // feature directly. checkpointDirsOptional.ifPresent(checkpointDirs -> checkpointDirs.forEach(checkpointDir -> { LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed " + @@ -278,7 +295,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager storeDirsToDelete.put(storeName, checkpointDir); })); - if (new TaskConfig(config).getTransactionalStateRetainExistingChangelogState()) { + if (new TaskConfig(config).getTransactionalStateRetainExistingState()) { // mark for restore from (oldest, newest) to recreate local state. LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, " + "local state will be fully restored from current changelog contents. " + @@ -290,7 +307,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager // mark for restore from (oldest, null) to trim entire changelog. storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, null)); } - } else if (// check if the checkpointed offset is in range of current oldest and newest offsets + } else if (// check if the checkpointed offset is out of range of current oldest and newest offsets admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 || admin.offsetComparator(checkpointedOffset, newestOffset) > 0) { // checkpointed offset is out of range. this could mean that this is a TTL topic and the checkpointed @@ -312,12 +329,29 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager "will be fully restored from current changelog contents.", storeName); storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset)); } else { // persistent logged store - // if there exists a valid store checkpoint directory with oldest offset <= local offset <= checkpointed offset, + String targetOffset; + + // check checkpoint time against min.compaction.lag.ms. if older, restore from checkpointed offset to newest + // with no trim. be conservative. allow 10% safety margin to avoid deletions when the downtime is close + // to min.compaction.lag.ms + long minCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(storeName); + if (timeSinceLastCheckpointInMs > .9 * minCompactionLagMs) { + LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and " + + "newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than " + + "0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance that" + + "the changelog topic has been compacted, restoring store to the end of the current changelog contents." + + "There is no transactional local state guarantee.", storeName, taskName, checkpointedOffset, + oldestOffset, newestOffset, timeSinceLastCheckpointInMs, minCompactionLagMs); + targetOffset = newestOffset; + } else { + targetOffset = checkpointedOffset; + } + + // if there exists a valid store checkpoint directory with oldest offset <= local offset <= target offset, // retain it and restore the delta. delete all other checkpoint directories for the store. if more than one such // checkpoint directory exists, retain the one with the highest local offset and delete the rest. boolean hasValidCheckpointDir = false; for (File checkpointDir: checkpointDirsOptional.get()) { - // TODO HIGH pmaheshw: should validation check / warn for compact lag config staleness too? if (storageManagerUtil.isLoggedStoreValid( storeName, checkpointDir, config, storeChangelogs, taskModel, clock, storeEngines)) { String localOffset = storageManagerUtil.readOffsetFile( @@ -326,17 +360,17 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager checkpointDir, taskName); if (admin.offsetComparator(localOffset, oldestOffset) >= 0 && - admin.offsetComparator(localOffset, checkpointedOffset) <= 0 && + admin.offsetComparator(localOffset, targetOffset) <= 0 && (storesToRestore.get(storeName) == null || admin.offsetComparator(localOffset, storesToRestore.get(storeName).startingOffset) > 0)) { hasValidCheckpointDir = true; LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. " + - "May be overridden later.", checkpointDir, storeName, taskName); + "May be overridden later.", checkpointDir, storeName, taskName); storeDirToRetain.put(storeName, checkpointDir); // mark for restore even if local == checkpointed, so that the changelog gets trimmed. LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to " + - "ending offset: {}. May be overridden later", storeName, taskName, localOffset, checkpointedOffset); - storesToRestore.put(storeName, new RestoreOffsets(localOffset, checkpointedOffset)); + "ending offset: {}. May be overridden later", storeName, taskName, localOffset, targetOffset); + storesToRestore.put(storeName, new RestoreOffsets(localOffset, targetOffset)); } } } @@ -353,7 +387,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager // if the store had not valid checkpoint dirs to retain, restore from changelog if (!hasValidCheckpointDir) { - storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset)); + storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, targetOffset)); } } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index a87c535..2a4f1d6 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -24,7 +24,7 @@ import java.util.{Objects, Optional} import java.util.concurrent.ScheduledExecutorService import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.{Checkpoint, OffsetManager} +import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointedChangelogOffset, OffsetManager} import org.apache.samza.config.{Config, StreamConfig, TaskConfig} import org.apache.samza.context._ import org.apache.samza.job.model.{JobModel, TaskModel} @@ -253,23 +253,26 @@ class TaskInstance( trace("Flushing state stores for taskName: %s" format taskName) newestChangelogOffsets = storageManager.flush() trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets)) + } + + val checkpointId = CheckpointId.create() + if (storageManager != null && newestChangelogOffsets != null) { + trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId)) + storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap) + } + + if (newestChangelogOffsets != null) { newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) => - allCheckpointOffsets.put(ssp, newestOffsetOption.orNull) + val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString + allCheckpointOffsets.put(ssp, offset) } } - val checkpoint = new Checkpoint(allCheckpointOffsets) trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets)) - var checkpointId: String = null - if (storageManager != null && newestChangelogOffsets != null) { - trace("Checkpointing stores for taskName: %s" format taskName) - checkpointId = storageManager.checkpoint(newestChangelogOffsets.toMap) - } - offsetManager.writeCheckpoint(taskName, checkpoint) - if (storageManager != null && checkpointId != null) { + if (storageManager != null) { trace("Remove old checkpoint stores for taskName: %s" format taskName) storageManager.removeOldCheckpoints(checkpointId) } diff --git a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala index cfca0f7..7b38749 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala @@ -23,6 +23,7 @@ import java.io._ import com.google.common.annotations.VisibleForTesting import com.google.common.collect.ImmutableSet +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.container.TaskName import org.apache.samza.job.model.TaskMode import org.apache.samza.system._ @@ -57,11 +58,10 @@ class NonTransactionalStateTaskStorageManager( newestChangelogSSPOffsets } - def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = { - null - } + override def checkpoint(checkpointId: CheckpointId, + newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {} - override def removeOldCheckpoints(checkpointId: String): Unit = {} + override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {} @VisibleForTesting def stop() { diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index c98461b..50d6418 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -19,6 +19,7 @@ package org.apache.samza.storage +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.system.SystemStreamPartition trait TaskStorageManager { @@ -27,9 +28,9 @@ trait TaskStorageManager { def flush(): Map[SystemStreamPartition, Option[String]] - def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String + def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit - def removeOldCheckpoints(checkpointId: String): Unit + def removeOldCheckpoints(checkpointId: CheckpointId): Unit def stop(): Unit diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala index c808866..20c7271 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.collect.ImmutableSet import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.WildcardFileFilter +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.{Partition, SamzaException} import org.apache.samza.container.TaskName import org.apache.samza.job.model.TaskMode @@ -56,15 +57,14 @@ class TransactionalStateTaskStorageManager( getNewestChangelogSSPOffsets(taskName, storeChangelogs, partition, systemAdmins) } - def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = { + def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = { debug("Checkpointing stores.") - val id = System.currentTimeMillis().toString val checkpointPaths = containerStorageManager.getAllStores(taskName).asScala .filter { case (storeName, storeEngine) => storeEngine.getStoreProperties.isLoggedStore && storeEngine.getStoreProperties.isPersistedToDisk} .flatMap { case (storeName, storeEngine) => { - val pathOptional = storeEngine.checkpoint(id) + val pathOptional = storeEngine.checkpoint(checkpointId) if (pathOptional.isPresent) { Some(storeName, pathOptional.get()) } else { @@ -74,11 +74,9 @@ class TransactionalStateTaskStorageManager( .toMap writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets) - - id } - def removeOldCheckpoints(latestCheckpointId: String): Unit = { + def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = { if (latestCheckpointId != null) { debug("Removing older checkpoints before " + latestCheckpointId) @@ -93,7 +91,7 @@ class TransactionalStateTaskStorageManager( val checkpointDirs = storeDir.listFiles(fileFilter) checkpointDirs - .filter(!_.getName.contains(latestCheckpointId)) + .filter(!_.getName.contains(latestCheckpointId.toString)) .foreach(checkpointDir => { FileUtils.deleteDirectory(checkpointDir) }) diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java index c742409..0022cc7 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java @@ -19,6 +19,8 @@ package org.apache.samza.operators.impl.store; import com.google.common.primitives.UnsignedBytes; + +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.serializers.Serde; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueSnapshot; @@ -132,7 +134,7 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override - public Optional<Path> checkpoint(String id) { + public Optional<Path> checkpoint(CheckpointId id) { return Optional.empty(); } diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java index 405abd7..2fdb81a 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.system.ChangelogSSPIterator; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; @@ -63,7 +64,7 @@ public class MockStorageEngine implements StorageEngine { } @Override - public Optional<Path> checkpoint(String id) { + public Optional<Path> checkpoint(CheckpointId id) { return Optional.empty(); } diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java index 789c0e7..879f8d5 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.checkpoint.CheckpointedChangelogOffset; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; @@ -180,8 +182,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -242,9 +246,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "21"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -307,9 +313,79 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); + } }; + Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = + ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); + + SystemAdmins mockSystemAdmins = mock(SystemAdmins.class); + SystemAdmin mockSystemAdmin = mock(SystemAdmin.class); + when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin); + StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class); + File mockLoggedStoreBaseDir = mock(File.class); + File mockNonLoggedStoreBaseDir = mock(File.class); + Config mockConfig = mock(Config.class); + Clock mockClock = mock(Clock.class); + + Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString())) + .thenAnswer((Answer<Integer>) invocation -> { + String offset1 = (String) invocation.getArguments()[0]; + String offset2 = (String) invocation.getArguments()[1]; + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + }); + + StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions( + mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset, + mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil, + mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock); + + // ensure that there is nothing to retain or delete + assertEquals(0, storeActions.storeDirsToDelete.size()); + assertEquals(0, storeActions.storeDirsToRetain.size()); + // ensure that we mark the store for full restore (from current oldest to current newest) + assertEquals("10", storeActions.storesToRestore.get(store1Name).startingOffset); + assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset); + } + + /** + * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when + * the job/container shut down and restarted after a long time. + */ + @Test + public void testGetStoreActionsForLoggedNonPersistentStore_FullRestoreIfCheckpointedOffsetInRangeButMaybeCompacted() { + TaskModel mockTaskModel = mock(TaskModel.class); + TaskName taskName = new TaskName("Partition 0"); + when(mockTaskModel.getTaskName()).thenReturn(taskName); + Partition taskChangelogPartition = new Partition(0); + when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition); + + String store1Name = "store1"; + StorageEngine store1Engine = mock(StorageEngine.class); + StoreProperties mockStore1Properties = mock(StoreProperties.class); + when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties); + when(mockStore1Properties.isLoggedStore()).thenReturn(true); + when(mockStore1Properties.isPersistedToDisk()).thenReturn(false); // non-persistent store + Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine); + + String changelog1SystemName = "system1"; + String changelog1StreamName = "store1Changelog"; + SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName); + SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition); + // checkpointed changelog offset > newest offset (e.g. changelog topic got changed) + SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("10", "20", "21"); + Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); + + String changelog1CheckpointedOffset = "5"; + CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint id older than default min.compaction.lag.ms + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset); + Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = + new HashMap<SystemStreamPartition, String>() { { + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -375,9 +451,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = null; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -389,7 +467,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false"); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false"); Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -437,9 +515,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = null; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -451,7 +531,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -499,8 +579,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -562,8 +644,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -643,8 +727,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -727,8 +813,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -810,8 +898,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -893,8 +983,10 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset = - ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset); + ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString()); Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -979,9 +1071,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = null; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -993,7 +1087,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -1072,9 +1166,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = null; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -1086,7 +1182,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false"); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false"); Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -1163,9 +1259,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -1177,7 +1275,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -1250,9 +1348,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = null; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -1264,7 +1364,7 @@ public class TestTransactionalStateTaskRestoreManager { File mockLoggedStoreBaseDir = mock(File.class); File mockNonLoggedStoreBaseDir = mock(File.class); HashMap<String, String> configMap = new HashMap<>(); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter Config mockConfig = new MapConfig(configMap); Clock mockClock = mock(Clock.class); @@ -1343,9 +1443,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "5"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); @@ -1403,6 +1505,97 @@ public class TestTransactionalStateTaskRestoreManager { } /** + * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when + * the job/container shut down and restarted after a long time. + */ + @Test + public void testGetStoreActionsForLoggedPersistentStore_RestoreFromLocalToNewestIfCheckpointedOffsetInRangeButMaybeCompacted() { + TaskModel mockTaskModel = mock(TaskModel.class); + TaskName taskName = new TaskName("Partition 0"); + when(mockTaskModel.getTaskName()).thenReturn(taskName); + Partition taskChangelogPartition = new Partition(0); + when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition); + + String store1Name = "store1"; + StorageEngine store1Engine = mock(StorageEngine.class); + StoreProperties mockStore1Properties = mock(StoreProperties.class); + when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties); + when(mockStore1Properties.isLoggedStore()).thenReturn(true); + when(mockStore1Properties.isPersistedToDisk()).thenReturn(true); + Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine); + + String changelog1SystemName = "system1"; + String changelog1StreamName = "store1Changelog"; + SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName); + SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition); + // checkpointed changelog offset is valid + SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("4", "20", "21"); + Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); + + String changelog1CheckpointedOffset = "5"; + CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint timestamp older than default min compaction lag + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset); + Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = + new HashMap<SystemStreamPartition, String>() { { + put(changelog1SSP, changelog1CheckpointMessage.toString()); + } }; + Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = + ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); + + SystemAdmins mockSystemAdmins = mock(SystemAdmins.class); + SystemAdmin mockSystemAdmin = mock(SystemAdmin.class); + when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin); + StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class); + File mockLoggedStoreBaseDir = mock(File.class); + File mockNonLoggedStoreBaseDir = mock(File.class); + Config mockConfig = mock(Config.class); + Clock mockClock = mock(Clock.class); + + File mockCurrentStoreDir = mock(File.class); + File mockStoreNewerCheckpointDir = mock(File.class); + File mockStoreOlderCheckpointDir = mock(File.class); + String olderCheckpointDirLocalOffset = "3"; + String newerCheckpointDirLocalOffset = "5"; + when(mockStorageManagerUtil.getTaskStoreDir(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any())) + .thenReturn(mockCurrentStoreDir); + when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any())) + .thenReturn(ImmutableList.of(mockStoreNewerCheckpointDir, mockStoreOlderCheckpointDir)); + when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreNewerCheckpointDir), any(), + eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true); + when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreOlderCheckpointDir), any(), + eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true); + Set<SystemStreamPartition> mockChangelogSSPs = ImmutableSet.of(changelog1SSP); + when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreNewerCheckpointDir), eq(mockChangelogSSPs), eq(false))) + .thenReturn(ImmutableMap.of(changelog1SSP, newerCheckpointDirLocalOffset)); + when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreOlderCheckpointDir), eq(mockChangelogSSPs), eq(false))) + .thenReturn(ImmutableMap.of(changelog1SSP, olderCheckpointDirLocalOffset)); // less than checkpointed offset (5) + + Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString())) + .thenAnswer((Answer<Integer>) invocation -> { + String offset1 = (String) invocation.getArguments()[0]; + String offset2 = (String) invocation.getArguments()[1]; + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + }); + + StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions( + mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset, + mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil, + mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock); + + // ensure that the current store dir and older checkpoint dir are marked for deletion + assertEquals(2, storeActions.storeDirsToDelete.get(store1Name).size()); + assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockCurrentStoreDir)); + assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockStoreOlderCheckpointDir)); + // ensure that newer checkpoint dir is retained + assertEquals(1, storeActions.storeDirsToRetain.size()); + assertEquals(mockStoreNewerCheckpointDir, storeActions.storeDirsToRetain.get(store1Name)); + // ensure that we mark the store for restore to head (from local checkpoint to current newest) + assertEquals("5", storeActions.storesToRestore.get(store1Name).startingOffset); + assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset); + } + + /** * This can happen if the changelog topic was manually deleted and recreated, and the checkpointed/local changelog * offset is not valid anymore. */ @@ -1431,9 +1624,11 @@ public class TestTransactionalStateTaskRestoreManager { Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream); String changelog1CheckpointedOffset = "21"; + CheckpointedChangelogOffset changelog1CheckpointMessage = + new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset); Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset = new HashMap<SystemStreamPartition, String>() { { - put(changelog1SSP, changelog1CheckpointedOffset); + put(changelog1SSP, changelog1CheckpointMessage.toString()); } }; Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets = ImmutableMap.of(changelog1SSP, changelog1SSPMetadata); diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 85cccf8..a54ae72 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -22,7 +22,7 @@ package org.apache.samza.container import java.util.Collections import org.apache.samza.{Partition, SamzaException} -import org.apache.samza.checkpoint.{Checkpoint, OffsetManager} +import org.apache.samza.checkpoint.{Checkpoint, CheckpointedChangelogOffset, OffsetManager} import org.apache.samza.config.MapConfig import org.apache.samza.context.{TaskContext => _, _} import org.apache.samza.job.model.TaskModel @@ -216,10 +216,9 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava) val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0)) val changelogOffsets = Map(changelogSSP -> Some("5")) - val checkpointId = "1234" when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets) when(this.taskStorageManager.flush()).thenReturn(changelogOffsets) - when(this.taskStorageManager.checkpoint(any[Map[SystemStreamPartition, Option[String]]])).thenReturn(checkpointId) + doNothing().when(this.taskStorageManager).checkpoint(any(), any[Map[SystemStreamPartition, Option[String]]]) taskInstance.commit val mockOrder = inOrder(this.offsetManager, this.collector, this.taskTableManager, this.taskStorageManager) @@ -238,7 +237,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { mockOrder.verify(this.taskStorageManager).flush() // Stores checkpoints should be created next with the newest changelog offsets - mockOrder.verify(this.taskStorageManager).checkpoint(changelogOffsets) + mockOrder.verify(this.taskStorageManager).checkpoint(any(), Matchers.eq(changelogOffsets)) // Input checkpoint should be written with the snapshot captured at the beginning of commit and the // newest changelog offset captured during storage manager flush @@ -246,10 +245,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { mockOrder.verify(offsetManager).writeCheckpoint(any(), captor.capture) val cp = captor.getValue assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION)) - assertEquals("5", cp.getOffsets.get(changelogSSP)) + assertEquals("5", CheckpointedChangelogOffset.fromString(cp.getOffsets.get(changelogSSP)).getOffset) // Old checkpointed stores should be cleared - mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(checkpointId) + mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(any()) verify(commitsCounter).inc() } @@ -269,7 +268,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { verify(offsetManager).writeCheckpoint(any(), captor.capture) val cp = captor.getValue assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION)) - assertEquals(null, cp.getOffsets.get(changelogSSP)) + val message = cp.getOffsets.get(changelogSSP) + val checkpointedOffset = CheckpointedChangelogOffset.fromString(message) + assertNull(checkpointedOffset.getOffset) + assertNotNull(checkpointedOffset.getCheckpointId) verify(commitsCounter).inc() } @@ -320,7 +322,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava) when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets) when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]()) - when(this.taskStorageManager.checkpoint(any())).thenThrow(new SamzaException("Error creating store checkpoint")) + when(this.taskStorageManager.checkpoint(any(), any())).thenThrow(new SamzaException("Error creating store checkpoint")) try { taskInstance.commit @@ -341,8 +343,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava) when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets) when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]()) - when(this.taskStorageManager.checkpoint(any())).thenReturn("id") - when(this.taskStorageManager.removeOldCheckpoints("id")) + doNothing().when(this.taskStorageManager).checkpoint(any(), any()) + when(this.taskStorageManager.removeOldCheckpoints(any())) .thenThrow(new SamzaException("Error clearing old checkpoints")) try { diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index 8f0fbb3..43872e9 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -171,7 +171,7 @@ public class TestContainerStorageManager { configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde"); configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde"); configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName()); - configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); Config config = new MapConfig(configMap); Map<String, Serde<Object>> serdes = new HashMap<>(); diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java index 69a2379..f2d4972 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Optional; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.TaskMode; import org.apache.samza.system.SystemAdmin; @@ -49,7 +50,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -272,7 +272,7 @@ public class TestTransactionalStateTaskStorageManager { when(lpStoreProps.isPersistedToDisk()).thenReturn(true); when(lpStoreProps.isLoggedStore()).thenReturn(true); Path mockPath = mock(Path.class); - when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath)); + when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath)); StorageEngine mockPStore = mock(StorageEngine.class); StoreProperties pStoreProps = mock(StoreProperties.class); @@ -309,14 +309,14 @@ public class TestTransactionalStateTaskStorageManager { ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1"))); // invoke checkpoint - tsm.checkpoint(offsets); + tsm.checkpoint(CheckpointId.create(), offsets); // ensure that checkpoint is never called for non-logged persistent stores since they're // always cleared on restart. - verify(mockPStore, never()).checkpoint(anyString()); + verify(mockPStore, never()).checkpoint(any()); // ensure that checkpoint is never called for in-memory stores since they're not persistent. - verify(mockIStore, never()).checkpoint(anyString()); - verify(mockLIStore, never()).checkpoint(anyString()); + verify(mockIStore, never()).checkpoint(any()); + verify(mockLIStore, never()).checkpoint(any()); verify(tsm).writeChangelogOffsetFiles(checkpointPathsCaptor.capture(), any(), eq(offsets)); Map<String, Path> checkpointPaths = checkpointPathsCaptor.getValue(); assertEquals(1, checkpointPaths.size()); @@ -332,7 +332,7 @@ public class TestTransactionalStateTaskStorageManager { when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps); when(lpStoreProps.isPersistedToDisk()).thenReturn(true); when(lpStoreProps.isLoggedStore()).thenReturn(true); - when(mockLPStore.checkpoint(anyString())).thenThrow(new IllegalStateException()); + when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException()); java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore); when(csm.getAllStores(any())).thenReturn(taskStores); @@ -343,7 +343,7 @@ public class TestTransactionalStateTaskStorageManager { ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1"))); // invoke checkpoint - tsm.checkpoint(offsets); + tsm.checkpoint(CheckpointId.create(), offsets); verify(tsm, never()).writeChangelogOffsetFiles(any(), any(), any()); fail("Should have thrown an exception if error creating store checkpoint"); } @@ -358,7 +358,7 @@ public class TestTransactionalStateTaskStorageManager { when(lpStoreProps.isPersistedToDisk()).thenReturn(true); when(lpStoreProps.isLoggedStore()).thenReturn(true); Path mockPath = mock(Path.class); - when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath)); + when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath)); java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore); when(csm.getAllStores(any())).thenReturn(taskStores); @@ -371,7 +371,7 @@ public class TestTransactionalStateTaskStorageManager { ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1"))); // invoke checkpoint - tsm.checkpoint(offsets); + tsm.checkpoint(CheckpointId.create(), offsets); fail("Should have thrown an exception if error writing offset file."); } diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index d482c15..2d26d29 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -25,6 +25,8 @@ import java.nio.file.Path import java.util import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId + /** * In memory implementation of a key value store. * @@ -127,7 +129,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { // No checkpoint being persisted. State restores from Changelog. Optional.empty() } diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index a2ae8b0..300177a 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -21,15 +21,13 @@ package org.apache.samza.storage.kv import java.io.File import java.nio.file.{Path, Paths} -import java.util -import java.util.{Comparator, Optional} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.{Comparator, Optional} -import org.apache.commons.io.FileUtils -import org.apache.samza.{SamzaException, checkpoint} +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.config.Config -import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.util.Logging import org.rocksdb.{TtlDB, _} @@ -239,9 +237,9 @@ class RocksDbKeyValueStore( trace("Flushed store: %s" format storeName) } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { val checkpoint = Checkpoint.create(db) - val checkpointPath = dir.getPath + "-" + id + val checkpointPath = dir.getPath + "-" + id.toString checkpoint.createCheckpoint(checkpointPath) Optional.of(Paths.get(checkpointPath)) } diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java index 7e514e7..177a986 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,7 +127,7 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> { } @Override - public Optional<Path> checkpoint(String id) { + public Optional<Path> checkpoint(CheckpointId id) { return store.checkpoint(id); } diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala index ace7aa5..8c32793 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala @@ -24,6 +24,7 @@ import java.nio.file.Path import java.util import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.config.StorageConfig import org.apache.samza.task.MessageCollector import org.apache.samza.util.Logging @@ -163,7 +164,7 @@ class AccessLoggedStore[K, V]( bytes } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { store.checkpoint(id) } } diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 41d2d9f..5c1961c 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -20,10 +20,13 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Logging + import scala.collection._ import java.nio.file.Path import java.util.{Arrays, Optional} +import org.apache.samza.checkpoint.CheckpointId + /** * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold: * 1. Batch together writes to rocksdb, this turns out to be a great optimization @@ -293,7 +296,7 @@ class CachedStore[K, V]( store.snapshot(from, to) } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { store.checkpoint(id) } } diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 61ff059..bc6778e 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -29,6 +29,8 @@ import org.apache.samza.util.TimerUtil import java.nio.file.Path import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId + /** * A key value store. * @@ -208,7 +210,7 @@ class KeyValueStorageEngine[K, V]( } } - def checkpoint(id: String): Optional[Path] = { + def checkpoint(id: CheckpointId): Optional[Path] = { updateTimer(metrics.checkpointNs) { trace("Checkpointing.") metrics.checkpoints.inc diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index 4c238bb..320e801 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -22,6 +22,7 @@ package org.apache.samza.storage.kv import java.nio.file.Path import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.util.Logging import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition} import org.apache.samza.task.MessageCollector @@ -121,7 +122,7 @@ class LoggedStore[K, V]( store.snapshot(from, to) } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { store.checkpoint(id) } } \ No newline at end of file diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 3bc4674..8bb6fa2 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -22,6 +22,8 @@ package org.apache.samza.storage.kv import java.nio.file.Path import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId + import scala.collection.JavaConverters._ object NullSafeKeyValueStore { @@ -99,7 +101,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt store.snapshot(from, to) } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { store.checkpoint(id) } } diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 169452c..5b3456c 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -21,6 +21,8 @@ package org.apache.samza.storage.kv import java.nio.file.Path import java.util.Optional + +import org.apache.samza.checkpoint.CheckpointId import org.apache.samza.util.Logging import org.apache.samza.serializers._ @@ -166,7 +168,7 @@ class SerializedKeyValueStore[K, V]( } } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { store.checkpoint(id) } } diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index c20c2c5..c0fc080 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -24,6 +24,8 @@ import java.util import java.nio.file.Path import java.util.Optional +import org.apache.samza.checkpoint.CheckpointId + /** * A mock key-value store wrapper that handles serialization */ @@ -76,7 +78,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] { throw new UnsupportedOperationException("iterator() not supported") } - override def checkpoint(id: String): Optional[Path] = { + override def checkpoint(id: CheckpointId): Optional[Path] = { Optional.empty() } } \ No newline at end of file diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java index 67a4de8..05e6737 100644 --- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java @@ -89,7 +89,7 @@ public class TransactionalStateIntegrationTest extends StreamApplicationIntegrat put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); put(TaskConfig.COMMIT_MS, "-1"); // manual commit only put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true"); - put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); + put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1"); put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR); } }; diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java index 0c678b6..41eb1ab 100644 --- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java @@ -88,7 +88,7 @@ public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicati put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); put(TaskConfig.COMMIT_MS, "-1"); // manual commit only put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true"); - put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); + put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1"); put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR); } };
