This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1373db238 [SAMZA-2787] GetDeleted API and Recover from
DeletedException (#1676)
1373db238 is described below
commit 1373db238683179123a15b246272cb0955f9b37c
Author: shekhars-li <[email protected]>
AuthorDate: Wed Aug 9 18:42:46 2023 -0700
[SAMZA-2787] GetDeleted API and Recover from DeletedException (#1676)
* GetDeleted API and Recover from DeletedException in commit for
BlobStoreBackendFactory
* Fix style issues, fix breaking test cases
* Fixed test failure
* Fix failing integration test - move storeConsumer start after init() in
ContainerStorageManager
* Style check fix
* Bug fixes, integration test
* Delete all blob from deleted SnapshotIndex
* clean up for final review
* Add integ test - delete snapshotindex and recover
* Fix failing integration tests related to init()
* Cleanup - remove unused code and move some code to util methods
* Unit test, minor refractoring
* Review comments - 1st round
* Refractor code
Remove taskcheckpoints mutation
* Address review comment
* Address review comment part 2
* Fix failing CSM test
* Addressed final review comments - more logs, consistent naming
* Checkstyle fix for test class
---------
Co-authored-by: Shekhar Sharma <[email protected]>
Co-authored-by: Shekhar Sharma <[email protected]>
---
.../samza/storage/blobstore/BlobStoreManager.java | 3 +-
.../samza/storage/TaskStorageCommitManager.java | 19 +-
.../storage/blobstore/BlobStoreBackupManager.java | 16 +-
.../storage/blobstore/BlobStoreRestoreManager.java | 62 ++--
.../storage/blobstore/util/BlobStoreUtil.java | 151 ++++++---
.../apache/samza/container/SamzaContainer.scala | 32 +-
.../org/apache/samza/container/TaskInstance.scala | 50 +--
.../samza/storage/ContainerStorageManager.java | 110 ++-----
.../ContainerStorageManagerRestoreUtil.java | 361 +++++++++++++++++++++
.../samza/storage/ContainerStorageManagerUtil.java | 3 +-
.../storage/TestTaskStorageCommitManager.java | 24 +-
.../blobstore/TestBlobStoreBackupManager.java | 16 +-
.../blobstore/TestBlobStoreRestoreManager.java | 31 +-
.../storage/blobstore/util/TestBlobStoreUtil.java | 93 +++---
.../samza/container/TestSamzaContainer.scala | 5 +-
.../apache/samza/container/TestTaskInstance.scala | 6 +-
.../samza/storage/TestContainerStorageManager.java | 200 +++++++++++-
.../TestTransactionalStateTaskBackupManager.java | 2 +-
.../samza/storage/MyStatefulApplication.java | 25 +-
.../kv/BlobStoreStateBackendIntegrationTest.java | 182 +++++++++++
.../samza/test/util/TestBlobStoreManager.java | 41 ++-
21 files changed, 1138 insertions(+), 294 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
b/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
index ef3f0c696..affa1f3d4 100644
---
a/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
+++
b/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
@@ -49,11 +49,12 @@ public interface BlobStoreManager {
* @param id Blob ID of the blob to get
* @param outputStream OutputStream to write the downloaded blob
* @param metadata User supplied {@link Metadata} of the request
+ * @param getDeletedBlob Flag to indicate if get should try to get a blob
marked for deletion but not yet compacted
* @return A future that completes when all the chunks are downloaded and
written successfully to the OutputStream
* @throws org.apache.samza.storage.blobstore.exceptions.DeletedException
returned future should complete
* exceptionally with DeletedException on failure with the blob
already deleted error.
*/
- CompletionStage<Void> get(String id, OutputStream outputStream, Metadata
metadata);
+ CompletionStage<Void> get(String id, OutputStream outputStream, Metadata
metadata, boolean getDeletedBlob);
/**
* Non-blocking call to mark a blob for deletion in the remote blob store
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 59b010172..5ed4f1e0b 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -93,18 +93,21 @@ public class TaskStorageCommitManager {
this.metrics = metrics;
}
- public void init() {
+ public void init(Checkpoint checkpoint) {
// Assuming that container storage manager has already started and created
to stores
storageEngines = containerStorageManager.getAllStores(taskName);
- if (checkpointManager != null) {
- Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
- LOG.debug("Last checkpoint on start for task: {} is: {}", taskName,
checkpoint);
- stateBackendToBackupManager.values()
- .forEach(storageBackupManager ->
storageBackupManager.init(checkpoint));
+ final Checkpoint latestCheckpoint;
+ if (checkpoint == null && checkpointManager != null) {
+ latestCheckpoint = checkpointManager.readLastCheckpoint(taskName);
+ LOG.debug("Last checkpoint on start for task: {} is: {}", taskName,
latestCheckpoint);
} else {
- stateBackendToBackupManager.values()
- .forEach(storageBackupManager -> storageBackupManager.init(null));
+ latestCheckpoint = checkpoint;
}
+ stateBackendToBackupManager.values()
+ .forEach(storageBackupManager -> {
+ LOG.debug("Init for storageBackupManager {} with checkpoint {}",
storageBackupManager, latestCheckpoint);
+ storageBackupManager.init(latestCheckpoint);
+ });
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index cb7ffad8a..5f87de95b 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -133,14 +133,17 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
@Override
public void init(Checkpoint checkpoint) {
long startTime = System.nanoTime();
- LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+ LOG.debug("Initializing blob store backup manager for task: {} with
checkpoint {}", taskName, checkpoint);
blobStoreManager.init();
// Note: blocks the caller thread.
// TODO LOW shesharma exclude stores that are no longer configured during
init
+ // NOTE: Get SnapshotIndex with getDeleted set to false. A failure to get
a blob from SnapshotIndex would restart
+ // the container and the init()/restore() should be able to create a new
Snapshot in the blob store and recover.
+ // This helps with the rare race condition where SnapshotIndex was deleted
after the restore completed successfully.
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
- blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName,
checkpoint, new HashSet<>(storesToBackup));
+ blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName,
checkpoint, new HashSet<>(storesToBackup), false);
this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
metrics.initNs.set(System.nanoTime() - startTime);
@@ -288,7 +291,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
Metadata requestMetadata =
new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH,
Optional.empty(), jobName, jobId, taskName, storeName);
CompletionStage<SnapshotIndex> snapshotIndexFuture =
- blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId,
requestMetadata);
+ blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId,
requestMetadata, false);
// 1. remove TTL of index blob and all of its files and sub-dirs
marked for retention
CompletionStage<Void> removeTTLFuture =
@@ -327,7 +330,12 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
});
return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures,
removePrevRemoteSnapshotFutures)
- .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime()
- startTime));
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Could not finish cleanup. checkpointid: {}, store SCMs:
{}", checkpointId, storeSCMs, ex);
+ }
+ metrics.cleanupNs.update(System.nanoTime() - startTime);
+ });
}
@Override
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index 5010fd0c6..c370c62df 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -119,16 +119,30 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
@Override
public void init(Checkpoint checkpoint) {
+ // By default, init without retrying deleted SnapshotIndex blob. We want
the init to fail if the SnapshotIndex blob
+ // was deleted. This allows us to mark the task for a full restore in
restore().
+ init(checkpoint, false);
+ }
+
+ /**
+ * Initialize state resources such as store directories.
+ * NOTE: init can be called twice. In case init fails with DeletedException
for the first time, it will be retried with
+ * getDeleted set to true.
+ * @param checkpoint Current task checkpoint
+ * @param getDeleted Flag to get deleted SnapshotIndex blob
+ */
+ public void init(Checkpoint checkpoint, boolean getDeleted) {
long startTime = System.nanoTime();
- LOG.debug("Initializing blob store restore manager for task: {}",
taskName);
+ LOG.debug("Initializing blob store restore manager for task {} with
getDeleted {}", taskName, getDeleted);
blobStoreManager.init();
// get previous SCMs from checkpoint
- prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName,
jobId, taskName, checkpoint, storesToRestore);
+ prevStoreSnapshotIndexes =
+ blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName,
checkpoint, storesToRestore, getDeleted);
metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
- LOG.trace("Found previous snapshot index during blob store restore manager
init for task: {} to be: {}",
- taskName, prevStoreSnapshotIndexes);
+ LOG.trace("Found previous snapshot index during blob store restore manager
init for task: {} to be: {} with getDeleted set to {}",
+ taskName, prevStoreSnapshotIndexes, getDeleted);
metrics.initStoreMetrics(storesToRestore);
@@ -150,7 +164,17 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
@Override
public CompletableFuture<Void> restore() {
return restoreStores(jobName, jobId, taskModel.getTaskName(),
storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
- storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor);
+ storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor, false);
+ }
+
+ /**
+ * Restore state from checkpoints and state snapshots.
+ * @param restoreDeleted This flag forces the restore to always download the
state from blob store with the get deleted
+ * flag enabled.
+ */
+ public CompletableFuture<Void> restore(boolean restoreDeleted) {
+ return restoreStores(jobName, jobId, taskModel.getTaskName(),
storesToRestore, prevStoreSnapshotIndexes,
+ loggedBaseDir, storageConfig, metrics, storageManagerUtil,
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
}
@Override
@@ -186,17 +210,9 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
if (!storesToBackup.contains(storeName) &&
!storesToRestore.contains(storeName)) {
LOG.info("Removing task: {} store: {} from blob store. It is either no
longer used, " +
"or is no longer configured to be backed up or restored with blob
store.", taskName, storeName);
- DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
Metadata requestMetadata =
new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH,
Optional.empty(), jobName, jobId, taskName, storeName);
- CompletionStage<Void> storeDeletionFuture =
- blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete
files and sub-dirs previously marked for removal
- .thenComposeAsync(v ->
- blobStoreUtil.deleteDir(dirIndex, requestMetadata),
executor) // deleted files and dirs still present
- .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
- scmAndSnapshotIndex.getLeft(), requestMetadata),
- executor); // delete the snapshot index blob
- storeDeletionFutures.add(storeDeletionFuture);
+
storeDeletionFutures.add(blobStoreUtil.cleanSnapshotIndex(scmAndSnapshotIndex.getLeft(),
scmAndSnapshotIndex.getRight(), requestMetadata));
}
});
@@ -211,10 +227,9 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig,
BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil,
- ExecutorService executor) {
+ ExecutorService executor, boolean getDeleted) {
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
-
LOG.debug("Starting restore for task: {} stores: {}", taskName,
storesToRestore);
storesToRestore.forEach(storeName -> {
if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
@@ -254,7 +269,12 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
throw new SamzaException(String.format("Error deleting store
directory: %s", storeDir), e);
}
- boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName,
dirIndex,
+ // Restore from blob store if:
+ // 1. shouldRestore() returns true - there is a diff between local and
remote snapshot.
+ // 2. getDeleted is set - Some blobs in the blob store were deleted
incorrectly (SAMZA-2787). Download/restore
+ // everything locally ignoring the diff.
This will be backed up afresh by
+ // ContainerStorageManager recovery path.
+ boolean shouldRestore = getDeleted ||
shouldRestore(taskName.getTaskName(), storeName, dirIndex,
storeCheckpointDir, storageConfig, dirDiffUtil);
if (shouldRestore) { // restore the store from the remote blob store
@@ -268,7 +288,7 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() -
storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName,
storeDir, dirIndex, storeRestoreStartTime,
- restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor);
+ restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor,
getDeleted);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory:
{} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
@@ -329,11 +349,11 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName,
String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures,
BlobStoreUtil blobStoreUtil,
- DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor) {
+ DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor, boolean getDeleted) {
Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(),
Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<Void> restoreFuture =
- blobStoreUtil.restoreDir(storeDir, dirIndex,
requestMetadata).thenRunAsync(() -> {
+ blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata,
getDeleted).thenRunAsync(() -> {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() -
storeRestoreStartTime);
long postRestoreStartTime = System.nanoTime();
@@ -345,7 +365,7 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
storeDir.getAbsolutePath()));
} else {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() -
postRestoreStartTime);
- LOG.info("Restore from remote snapshot completed for store: {}",
storeDir);
+ LOG.info("Restore from remote snapshot completed for store: {}
with getDeleted set to {}", storeDir, getDeleted);
}
}, executor);
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index fb7104ed1..327eb7f33 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -110,10 +110,11 @@ public class BlobStoreUtil {
* @param checkpoint {@link Checkpoint} instance to get the store state
checkpoint markers from. Only
* {@link CheckpointV2} and newer are supported for blob
stores.
* @param storesToBackupOrRestore set of store names to be backed up or
restored
+ * @param getDeleted tries gets a deleted but not yet compacted
SnapshotIndex from the blob store.
* @return Map of store name to its blob id of snapshot indices and their
corresponding snapshot indices for the task.
*/
- public Map<String, Pair<String, SnapshotIndex>> getStoreSnapshotIndexes(
- String jobName, String jobId, String taskName, Checkpoint checkpoint,
Set<String> storesToBackupOrRestore) {
+ public Map<String, Pair<String, SnapshotIndex>>
getStoreSnapshotIndexes(String jobName, String jobId, String taskName,
+ Checkpoint checkpoint, Set<String> storesToBackupOrRestore, boolean
getDeleted) {
//TODO MED shesharma document error handling (checkpoint ver, blob not
found, getBlob)
if (checkpoint == null) {
LOG.debug("No previous checkpoint found for taskName: {}", taskName);
@@ -136,11 +137,12 @@ public class BlobStoreUtil {
storeSnapshotIndexBlobIds.forEach((storeName, snapshotIndexBlobId) -> {
if (storesToBackupOrRestore.contains(storeName)) {
try {
- LOG.debug("Getting snapshot index for taskName: {} store: {}
blobId: {}", taskName, storeName, snapshotIndexBlobId);
+ LOG.debug("Getting snapshot index for taskName: {} store: {}
blobId: {} with getDeleted set to {}",
+ taskName, storeName, snapshotIndexBlobId, getDeleted);
Metadata requestMetadata =
new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH,
Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<SnapshotIndex> snapshotIndexFuture =
- getSnapshotIndex(snapshotIndexBlobId,
requestMetadata).toCompletableFuture();
+ getSnapshotIndex(snapshotIndexBlobId, requestMetadata,
getDeleted).toCompletableFuture();
Pair<CompletableFuture<String>, CompletableFuture<SnapshotIndex>>
pairOfFutures =
Pair.of(CompletableFuture.completedFuture(snapshotIndexBlobId),
snapshotIndexFuture);
@@ -162,15 +164,7 @@ public class BlobStoreUtil {
}
try {
- return FutureUtil.toFutureOfMap(t -> {
- Throwable unwrappedException =
FutureUtil.unwrapExceptions(CompletionException.class, t);
- if (unwrappedException instanceof DeletedException) {
- LOG.warn("Ignoring already deleted snapshot index for taskName: {}",
taskName, t);
- return true;
- } else {
- return false;
- }
- }, storeSnapshotIndexFutures).join();
+ return FutureUtil.toFutureOfMap(storeSnapshotIndexFutures).join();
} catch (Exception e) {
throw new SamzaException(
String.format("Error while waiting to get store snapshot indexes for
task %s", taskName), e);
@@ -182,12 +176,12 @@ public class BlobStoreUtil {
* @param blobId blob ID of the {@link SnapshotIndex} to get
* @return a Future containing the {@link SnapshotIndex}
*/
- public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId,
Metadata metadata) {
+ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId,
Metadata metadata, boolean getDeleted) {
Preconditions.checkState(StringUtils.isNotBlank(blobId));
String opName = "getSnapshotIndex: " + blobId;
return FutureUtil.executeAsyncWithRetries(opName, () -> {
ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); //
no need to close ByteArrayOutputStream
- return blobStoreManager.get(blobId, indexBlobStream,
metadata).toCompletableFuture()
+ return blobStoreManager.get(blobId, indexBlobStream, metadata,
getDeleted).toCompletableFuture()
.thenApplyAsync(f ->
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor);
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
@@ -210,6 +204,51 @@ public class BlobStoreUtil {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Gets SnapshotIndex blob, cleans up a SnapshotIndex by recursively
deleting all blobs associated with files/subdirs
+ * inside the SnapshotIndex and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob id of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeleted Determines whether to try to get deleted SnapshotIndex
or not.
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, boolean getDeleted) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ return getSnapshotIndex(snapshotIndexBlobId, getSnapshotRequest,
getDeleted)
+ .thenCompose(snapshotIndex -> cleanSnapshotIndex(snapshotIndexBlobId,
snapshotIndex, requestMetadata));
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ Throwable unwrappedException =
FutureUtil.unwrapExceptions(CompletionException.class,
+ FutureUtil.unwrapExceptions(SamzaException.class, ex));
+ // If a blob is already deleted, do not fail -> this may happen
if after we restore a
+ // deleted checkpoint and then try to clean up old checkpoint.
+ if (unwrappedException instanceof DeletedException) {
+ LOG.warn("Request {} received DeletedException on trying to
clean up SnapshotIndex {}. Ignoring the error.",
+ requestMetadata, snapshotIndexBlobId);
+ return null;
+ }
+ String msg = String.format("Request %s received error
deleting/cleaning up SnapshotIndex: %s",
+ requestMetadata, snapshotIndexBlobId);
+ throw new SamzaException(msg, ex);
+ });
+ return storeDeletionFuture;
+ }
+
/**
* WARNING: This method deletes the **SnapshotIndex blob** from the
snapshot. This should only be called to clean
* up an older snapshot **AFTER** all the files and sub-dirs to be deleted
from this snapshot are already deleted
@@ -229,10 +268,11 @@ public class BlobStoreUtil {
/**
* Non-blocking restore of a {@link SnapshotIndex} to local store by
downloading all the files and sub-dirs associated
* with this remote snapshot.
+ * NOTE: getDeleted flag sets if it reattempts to get a deleted file by
setting getDeleted flag in getFiles.
* @return A future that completes when all the async downloads completes
*/
- public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex,
Metadata metadata) {
- LOG.debug("Restoring contents of directory: {} from remote snapshot.",
baseDir);
+ public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex,
Metadata metadata, boolean getDeleted) {
+ LOG.debug("Restoring contents of directory: {} from remote snapshot.
GetDeletedFiles set to: {}", baseDir, getDeleted);
List<CompletableFuture<Void>> downloadFutures = new ArrayList<>();
@@ -255,7 +295,7 @@ public class BlobStoreUtil {
String opName = "restoreFile: " + fileToRestore.getAbsolutePath();
CompletableFuture<Void> fileRestoreFuture =
- FutureUtil.executeAsyncWithRetries(opName, () -> getFile(fileBlobs,
fileToRestore, requestMetadata),
+ FutureUtil.executeAsyncWithRetries(opName, () -> getFile(fileBlobs,
fileToRestore, requestMetadata, getDeleted),
isCauseNonRetriable(), executor, retryPolicyConfig);
downloadFutures.add(fileRestoreFuture);
}
@@ -264,7 +304,7 @@ public class BlobStoreUtil {
List<DirIndex> subDirs = dirIndex.getSubDirsPresent();
for (DirIndex subDir : subDirs) {
File subDirFile = Paths.get(baseDir.getAbsolutePath(),
subDir.getDirName()).toFile();
- downloadFutures.add(restoreDir(subDirFile, subDir, metadata));
+ downloadFutures.add(restoreDir(subDirFile, subDir, metadata,
getDeleted));
}
return FutureUtil.allOf(downloadFutures);
@@ -397,10 +437,11 @@ public class BlobStoreUtil {
* @param fileBlobs List of {@link FileBlob}s that constitute this file.
* @param fileToRestore File pointing to the local path where the file will
be restored.
* @param requestMetadata {@link Metadata} associated with this request
+ * @param getDeleted Flag that indicates whether to try to get Deleted (but
not yet compacted) files.
* @return a future that completes when the file is downloaded and written
or if an exception occurs.
*/
@VisibleForTesting
- CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File
fileToRestore, Metadata requestMetadata) {
+ CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File
fileToRestore, Metadata requestMetadata, boolean getDeleted) {
FileOutputStream outputStream = null;
try {
long restoreFileStartTime = System.nanoTime();
@@ -422,9 +463,9 @@ public class BlobStoreUtil {
CompletableFuture<Void> resultFuture =
CompletableFuture.completedFuture(null);
for (FileBlob fileBlob : fileBlobsCopy) {
resultFuture = resultFuture.thenComposeAsync(v -> {
- LOG.debug("Starting restore for file: {} with blob id: {} at offset:
{}", fileToRestore, fileBlob.getBlobId(),
- fileBlob.getOffset());
- return blobStoreManager.get(fileBlob.getBlobId(), finalOutputStream,
requestMetadata);
+ LOG.debug("Starting restore for file: {} with blob id: {} at offset:
{} with getDeleted set to: {}",
+ fileToRestore, fileBlob.getBlobId(), fileBlob.getOffset(),
getDeleted);
+ return blobStoreManager.get(fileBlob.getBlobId(), finalOutputStream,
requestMetadata, getDeleted);
}, executor);
}
@@ -565,6 +606,44 @@ public class BlobStoreUtil {
return CompletableFuture.allOf(deleteFutures.toArray(new
CompletableFuture[0]));
}
+ /**
+ * Get the {@link SnapshotIndex} using the blob id and marks all the blobs
associated with it to never expire,
+ * including the SnapshotIndex itself.
+ * @param indexBlobId Blob id of {@link SnapshotIndex}
+ * @param metadata {@link Metadata} related to the request
+ * @return A future that completes when all the files and subdirs associated
with this remote snapshot, as well as
+ * the {@link SnapshotIndex} associated with the snapshot are marked to
never expire.
+ */
+ public CompletableFuture<Void> removeTTLForSnapshotIndex(String indexBlobId,
Metadata metadata) {
+ return getSnapshotIndex(indexBlobId, metadata, false)
+ .thenCompose(snapshotIndex -> removeTTL(indexBlobId, snapshotIndex,
metadata));
+ }
+
+ /**
+ * Marks all the blobs associated with an {@link SnapshotIndex} to never
expire, including the SnapshotIndex
+ * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot
+ * @param metadata {@link Metadata} related to the request
+ * @return A future that completes when all the files and subdirs associated
with this remote snapshot are marked to
+ * never expire.
+ */
+ public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex
snapshotIndex, Metadata metadata) {
+ SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata();
+ LOG.debug("Marking contents of SnapshotIndex: {} to never expire",
snapshotMetadata.toString());
+
+ String opName = "removeTTL for SnapshotIndex for checkpointId: " +
snapshotMetadata.getCheckpointId();
+ Supplier<CompletionStage<Void>> removeDirIndexTTLAction =
+ () -> removeTTL(snapshotIndex.getDirIndex(),
metadata).toCompletableFuture();
+ CompletableFuture<Void> dirIndexTTLRemovalFuture =
+ FutureUtil.executeAsyncWithRetries(opName, removeDirIndexTTLAction,
isCauseNonRetriable(), executor, retryPolicyConfig);
+
+ return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> {
+ String op2Name = "removeTTL for indexBlobId: " + indexBlobId;
+ Supplier<CompletionStage<Void>> removeIndexBlobTTLAction =
+ () -> blobStoreManager.removeTTL(indexBlobId,
metadata).toCompletableFuture();
+ return FutureUtil.executeAsyncWithRetries(op2Name,
removeIndexBlobTTLAction, isCauseNonRetriable(), executor, retryPolicyConfig);
+ }, executor);
+ }
+
/**
* Recursively mark all the blobs associated with the {@link DirIndex} to
never expire (remove TTL).
* @param dirIndex the {@link DirIndex} whose contents' TTL needs to be
removed
@@ -603,32 +682,6 @@ public class BlobStoreUtil {
return CompletableFuture.allOf(updateTTLsFuture.toArray(new
CompletableFuture[0]));
}
-
- /**
- * Marks all the blobs associated with an {@link SnapshotIndex} to never
expire.
- * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot
- * @param metadata {@link Metadata} related to the request
- * @return A future that completes when all the files and subdirs associated
with this remote snapshot are marked to
- * never expire.
- */
- public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex
snapshotIndex, Metadata metadata) {
- SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata();
- LOG.debug("Marking contents of SnapshotIndex: {} to never expire",
snapshotMetadata.toString());
-
- String opName = "removeTTL for SnapshotIndex for checkpointId: " +
snapshotMetadata.getCheckpointId();
- Supplier<CompletionStage<Void>> removeDirIndexTTLAction =
- () -> removeTTL(snapshotIndex.getDirIndex(),
metadata).toCompletableFuture();
- CompletableFuture<Void> dirIndexTTLRemovalFuture =
- FutureUtil.executeAsyncWithRetries(opName, removeDirIndexTTLAction,
isCauseNonRetriable(), executor, retryPolicyConfig);
-
- return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> {
- String op2Name = "removeTTL for indexBlobId: " + indexBlobId;
- Supplier<CompletionStage<Void>> removeIndexBlobTTLAction =
- () -> blobStoreManager.removeTTL(indexBlobId,
metadata).toCompletableFuture();
- return FutureUtil.executeAsyncWithRetries(op2Name,
removeIndexBlobTTLAction, isCauseNonRetriable(), executor, retryPolicyConfig);
- }, executor);
- }
-
private static Predicate<Throwable> isCauseNonRetriable() {
return throwable -> {
Throwable unwrapped =
FutureUtil.unwrapExceptions(CompletionException.class, throwable);
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d9cbce974..999bdb7dd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -30,7 +30,7 @@ import java.util.{Base64, Optional}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.SamzaException
import org.apache.samza.application.ApplicationUtil
-import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager,
OffsetManagerMetrics}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointListener,
OffsetManager, OffsetManagerMetrics}
import org.apache.samza.clustermanager.StandbyTaskUtil
import org.apache.samza.config.{StreamConfig, _}
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
@@ -780,11 +780,11 @@ class SamzaContainer(
// TODO HIGH pmaheshw SAMZA-2338: since store restore needs to trim
changelog messages,
// need to start changelog producers before the stores, but stop them
after stores.
startProducers
- startStores
+ val taskCheckpoints = startStores
startTableManager
startDiskSpaceMonitor
startHostStatisticsMonitor
- startTask
+ startTask(taskCheckpoints)
startConsumers
startSecurityManger
@@ -981,7 +981,14 @@ class SamzaContainer(
}
}
- def startStores {
+ /**
+ * Starts all the stores by restoring and recreating the stores, if necessary
+ * @return Returns the latest checkpoint associated with each task. This
checkpoint does not have Startpoint applied
+ * over it.
+ * Note: In case of blob store manager, returned checkpoints for a
task may contain a checkpoint recreated on
+ * blob store, in case the previous one was recently deleted. More
details in SAMZA-2787
+ */
+ def startStores: util.Map[TaskName, Checkpoint] = {
info("Starting container storage manager.")
containerStorageManager.start()
}
@@ -993,10 +1000,21 @@ class SamzaContainer(
})
}
- def startTask {
- info("Initializing stream tasks.")
+ /**
+ * Init all task instances
+ * @param taskCheckpoints last checkpoint for a TaskName. This last
checkpoint could be different from the one returned
+ * from CheckpointManager#getLastCheckpoint. The new
checkpoint could be created in case the last
+ * checkpoint was recently deleted and
BlobStoreManager could recover it. More details in
+ * SAMZA-2787
+ */
+ def startTask(taskCheckpoints: util.Map[TaskName, Checkpoint]) {
+ info("Initializing stream tasks with taskCheckpoints %s."
format(taskCheckpoints) )
- taskInstances.values.foreach(_.initTask)
+ taskInstances.keys.foreach { taskName =>
+ val taskInstance = taskInstances(taskName)
+ val checkpoint = taskCheckpoints.asScala.getOrElse(taskName, null)
+ taskInstance.initTask(Some(checkpoint))
+ }
}
def startAdmins {
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5f35ce21e..285e7c877 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -24,10 +24,10 @@ import java.util.{Collections, Objects, Optional}
import java.util.concurrent.{CompletableFuture, ExecutorService,
ScheduledExecutorService, Semaphore, TimeUnit}
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.kafka.{KafkaChangelogSSPOffset,
KafkaStateCheckpointMarker}
-import org.apache.samza.checkpoint.{CheckpointId, CheckpointV1, CheckpointV2,
OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointV1,
CheckpointV2, OffsetManager}
import org.apache.samza.config.{Config, JobConfig, StreamConfig, TaskConfig}
import org.apache.samza.context._
-import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.job.model.{JobModel, TaskMode, TaskModel}
import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler,
ScheduledCallback}
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.{ContainerStorageManager,
TaskStorageCommitManager}
@@ -149,31 +149,41 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Option[Checkpoint]) {
initCaughtUpMapping()
- if (commitManager != null) {
- debug("Starting commit manager for taskName: %s" format taskName)
+ val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+ var checkpoint: Checkpoint = lastTaskCheckpoint.orNull
+ debug("initTask with optional checkpoint %s" format(checkpoint))
- commitManager.init()
+ if (commitManager != null) {
+ debug("Starting commit manager for taskName %s" format taskName)
+ if (isStandByTask) {
+ debug("Passing null to init for standby taskName %s" format taskName)
+ // pass null in case of standby task. This is to ensure, checkpoint is
always read from checkpoint topic
+ commitManager.init(null)
+ } else {
+ debug("init taskName %s with Checkpoint %s" format(taskName,
checkpoint))
+ commitManager.init(checkpoint)
+ }
} else {
debug("Skipping commit manager initialization for taskName: %s" format
taskName)
}
- if (offsetManager != null) {
- val checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
- // Only required for checkpointV2
- if (checkpoint != null && checkpoint.getVersion == 2) {
- val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
- // call cleanUp on backup managers in case the container previously
failed during commit
- // before completing this step
-
- // WARNING: cleanUp is NOT optional with blob stores since this is
where we reset the TTL for
- // tracked blobs. if this TTL reset is skipped, some of the blobs
retained by future commits may
- // be deleted in the background by the blob store, leading to data
loss.
- info("Cleaning up stale state from previous run for taskName: %s"
format taskName)
- commitManager.cleanUp(checkpointV2.getCheckpointId,
checkpointV2.getStateCheckpointMarkers)
- }
+ if (offsetManager != null && isStandByTask) {
+ checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
+ }
+ // Only required for checkpointV2
+ if (checkpoint != null && checkpoint.getVersion == 2) {
+ val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
+ // call cleanUp on backup managers in case the container previously
failed during commit
+ // before completing this step
+
+ // WARNING: cleanUp is NOT optional with blob stores since this is where
we reset the TTL for
+ // tracked blobs. if this TTL reset is skipped, some of the blobs
retained by future commits may
+ // be deleted in the background by the blob store, leading to data loss.
+ info("Cleaning up stale state from previous run for taskName: %s" format
taskName)
+ commitManager.cleanUp(checkpointV2.getCheckpointId,
checkpointV2.getStateCheckpointMarkers)
}
if (taskConfig.getTransactionalStateRestoreEnabled() &&
taskConfig.getCommitMs > 0) {
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index c9ee065c0..f46a823e3 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -22,17 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
@@ -48,7 +45,6 @@ import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.metrics.Gauge;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
@@ -207,10 +203,14 @@ public class ContainerStorageManager {
new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
}
-
- public void start() throws SamzaException, InterruptedException {
+ /**
+ * Starts all the task stores.
+ * Returns the latest checkpoint for each task. This checkpoint may be
different from the lastCheckpoint returned by
+ * checkpoint manager in case of a BlobStoreRestoreManager.
+ */
+ public Map<TaskName, Checkpoint> start() throws SamzaException,
InterruptedException {
// Restores and recreates stores.
- restoreStores();
+ Map<TaskName, Checkpoint> taskCheckpoints = restoreStores();
// Shutdown restore executor since it will no longer be used
try {
@@ -248,10 +248,11 @@ public class ContainerStorageManager {
});
isStarted = true;
+ return taskCheckpoints;
}
// Restoration of all stores, in parallel across tasks
- private void restoreStores() throws InterruptedException {
+ private Map<TaskName, Checkpoint> restoreStores() throws
InterruptedException {
LOG.info("Store Restore started");
Set<TaskName> activeTasks =
ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet();
// Find all non-side input stores
@@ -263,6 +264,7 @@ public class ContainerStorageManager {
// Obtain the checkpoints for each task
Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new
HashMap<>();
Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
Checkpoint taskCheckpoint = null;
if (checkpointManager != null && activeTasks.contains(taskName)) {
@@ -301,79 +303,30 @@ public class ContainerStorageManager {
samzaContainerMetrics, taskInstanceMetrics,
taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config,
clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+ taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
- // Initialize each TaskStorageManager
- taskRestoreManagers.forEach((taskName, restoreManagers) ->
- restoreManagers.forEach((factoryName, taskRestoreManager) ->
- taskRestoreManager.init(taskCheckpoints.get(taskName))
- )
- );
-
- // Start each store consumer once.
- // Note: These consumers are per system and only changelog system store
consumers will be started.
- // Some TaskRestoreManagers may not require the consumer to to be started,
but due to the agnostic nature of
- // ContainerStorageManager we always start the changelog consumer here in
case it is required
-
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
- // Submit restore callable for each taskInstance
- taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
- // Submit for each restore factory
- restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
- long startTime = System.currentTimeMillis();
- String taskName = taskInstance.getTaskName();
- LOG.info("Starting restore for state for task: {}", taskName);
- CompletableFuture<Void> restoreFuture =
taskRestoreManager.restore().handle((res, ex) -> {
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore",
- taskName, ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be be able to
continue processing/backups
- // if restore manager close fails.
- }
-
- long timeToRestore = System.currentTimeMillis() - startTime;
- if (samzaContainerMetrics != null) {
- Gauge taskGauge =
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance,
null);
-
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
- }
- }
-
- if (ex != null) {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked exception
to throw from lambda
- } else {
- return null;
- }
- });
-
- taskRestoreFutures.add(restoreFuture);
- });
- });
+ // Init all taskRestores and if successful, restores all the task stores
concurrently
+ LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
+ CompletableFuture<Map<TaskName, Checkpoint>>
initRestoreAndNewCheckpointFuture =
+
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(taskRestoreManagers,
samzaContainerMetrics,
+ checkpointManager, jobContext, containerModel, taskCheckpoints,
taskBackendFactoryToStoreNames, config,
+ restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory,
storeConsumers);
- // Loop-over the future list to wait for each restore to finish, catch any
exceptions during restore and throw
- // as samza exceptions
- for (Future<Void> future : taskRestoreFutures) {
- try {
- future.get();
- } catch (InterruptedException e) {
- LOG.warn("Received an interrupt during store restoration. Interrupting
the restore executor to exit "
- + "prematurely without restoring full state.");
- restoreExecutor.shutdownNow();
- throw e;
- } catch (Exception e) {
- LOG.error("Exception when restoring state.", e);
- throw new SamzaException("Exception when restoring state.", e);
- }
+ // Update the task checkpoints map, if it was updated during the restore.
Throw an exception if the restore or
+ // creating a new checkpoint (in case of BlobStoreBackendFactory) failed.
+ try {
+ Map<TaskName, Checkpoint> newTaskCheckpoints =
initRestoreAndNewCheckpointFuture.get();
+ taskCheckpoints.putAll(newTaskCheckpoints);
+ LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints
are: {}", taskCheckpoints, newTaskCheckpoints);
+ } catch (InterruptedException e) {
+ LOG.warn("Received an interrupt during store restoration. Interrupting
the restore executor to exit "
+ + "prematurely without restoring full state.");
+ restoreExecutor.shutdownNow();
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Exception when restoring state.", e);
+ throw new SamzaException("Exception when restoring state.", e);
}
// Stop each store consumer once
@@ -400,6 +353,7 @@ public class ContainerStorageManager {
});
LOG.info("Store Restore complete");
+ return taskCheckpoints;
}
/**
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
new file mode 100644
index 000000000..db0a5be46
--- /dev/null
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.storage.blobstore.BlobStoreBackupManager;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+import org.apache.samza.storage.blobstore.BlobStoreRestoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ContainerStorageManagerRestoreUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageManagerRestoreUtil.class);
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn("Received DeletedException during restore for task {}.
Attempting to get blobs with getDeleted flag",
+ taskInstanceName.getTaskName());
+
+ // Try to restore with getDeleted flag
+ CompletableFuture<Checkpoint> future =
+ restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
+
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
+ jobContext, containerModel);
+ try {
+ newTaskCheckpoints.put(taskInstanceName, future);
+ } catch (Exception e) {
+ String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
+ throw new SamzaException(msg, e);
+ } finally {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+ }
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
+ }
+ }
+
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
+ try {
+ taskRestoreManager.close();
+ } catch (Exception e) {
+ LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
+ ex != null ? "unsuccessful" : "successful", e);
+ // ignore exception from close. container may still be able to
continue processing/backups
+ // if restore manager close fails.
+ }
+ return null;
+ });
+ taskRestoreFutures.add(taskRestoreFuture);
+ });
+ });
+ CompletableFuture<Void> restoreFutures =
CompletableFuture.allOf(taskRestoreFutures.toArray(new CompletableFuture[0]));
+ return restoreFutures.thenCompose(ignoredVoid ->
FutureUtil.toFutureOfMap(newTaskCheckpoints));
+ }
+
+ /**
+ * Returns a single future that guarantees all the following are completed,
in this order:
+ * 1. Restore state locally by getting deleted blobs from the blob store.
+ * 2. Create a new snapshot from restored state by backing it up on the
blob store.
+ * 3. Remove TTL from the new Snapshot and all the associated blobs in the
blob store
+ * 4. Clean up old/deleted Snapshot
+ * 5. Create and write the new checkpoint to checkpoint topic
+ */
+ private static CompletableFuture<Checkpoint> restoreDeletedSnapshot(TaskName
taskName,
+ Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager
checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, Set<String> storesToRestore, File
loggedStoreBaseDirectory, JobContext jobContext,
+ ContainerModel containerModel) {
+
+ // if taskInstanceMetrics are specified use those for store metrics,
+ // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
+ MetricsRegistry metricsRegistry =
+ taskInstanceMetrics.get(taskName) != null ?
taskInstanceMetrics.get(taskName).registry()
+ : new MetricsRegistryMap();
+
+ BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+ JobConfig jobConfig = new JobConfig(config);
+ BlobStoreUtil blobStoreUtil =
+ new BlobStoreUtil(blobStoreManager, executor, new
BlobStoreConfig(config), null,
+ new BlobStoreRestoreManagerMetrics(metricsRegistry));
+
+ BlobStoreRestoreManager blobStoreRestoreManager =
(BlobStoreRestoreManager) taskRestoreManager;
+
+ CheckpointId checkpointId = CheckpointId.create();
+ Map<String, String> oldSCMs = ((CheckpointV2)
taskCheckpoints.get(taskName)).getStateCheckpointMarkers()
+ .get(BlobStoreStateBackendFactory.class.getName());
+
+ // 1. Restore state with getDeleted flag set to true
+ CompletableFuture<Void> restoreFuture =
blobStoreRestoreManager.restore(true);
+
+ // 2. Create a new checkpoint and back it up on the blob store
+ CompletableFuture<Map<String, String>> backupStoresFuture =
restoreFuture.thenCompose(
+ r -> backupRecoveredStore(jobContext, containerModel, config, taskName,
storesToRestore, checkpointId,
+ loggedStoreBaseDirectory, blobStoreManager, metricsRegistry,
executor));
+
+ // 3. Mark new Snapshots to never expire
+ CompletableFuture<Void> removeNewSnapshotsTTLFuture =
backupStoresFuture.thenCompose(
+ storeSCMs -> {
+ List<CompletableFuture<Void>> removeTTLForSnapshotIndexFutures = new
ArrayList<>();
+ storeSCMs.forEach((store, scm) -> {
+ Metadata requestMetadata = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
+ jobConfig.getName().get(), jobConfig.getJobId(),
taskName.getTaskName(), store);
+
removeTTLForSnapshotIndexFutures.add(blobStoreUtil.removeTTLForSnapshotIndex(scm,
requestMetadata));
+ });
+ return
CompletableFuture.allOf(removeTTLForSnapshotIndexFutures.toArray(new
CompletableFuture[0]));
+ });
+
+ // 4. Delete prev SnapshotIndex including files/subdirs
+ CompletableFuture<Void> deleteOldSnapshotsFuture =
removeNewSnapshotsTTLFuture.thenCompose(
+ ignore -> {
+ List<CompletableFuture<Void>> deletePrevSnapshotFutures = new
ArrayList<>();
+ oldSCMs.forEach((store, oldSCM) -> {
+ Metadata requestMetadata = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
+ jobConfig.getName().get(), jobConfig.getJobId(),
taskName.getTaskName(), store);
+
deletePrevSnapshotFutures.add(blobStoreUtil.cleanSnapshotIndex(oldSCM,
requestMetadata, true).toCompletableFuture());
+ });
+ return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new
CompletableFuture[0]));
+ });
+
+ // 5. create new checkpoint
+ CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
+ deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms)
->
+ writeNewCheckpoint(taskName, checkpointId, scms,
checkpointManager));
+
+ return newTaskCheckpointsFuture.exceptionally(ex -> {
+ String msg = String.format("Could not restore task: %s after attempting
to restore deleted blobs.", taskName);
+ throw new SamzaException(msg, ex);
+ });
+ }
+
+ private static CompletableFuture<Map<String, String>>
backupRecoveredStore(JobContext jobContext,
+ ContainerModel containerModel, Config config, TaskName taskName,
Set<String> storesToBackup,
+ CheckpointId newCheckpointId, File loggedStoreBaseDirectory,
BlobStoreManager blobStoreManager,
+ MetricsRegistry metricsRegistry, ExecutorService executor) {
+
+ BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics = new
BlobStoreBackupManagerMetrics(metricsRegistry);
+ BlobStoreBackupManager blobStoreBackupManager =
+ new BlobStoreBackupManager(jobContext.getJobModel(), containerModel,
containerModel.getTasks().get(taskName),
+ executor, blobStoreBackupManagerMetrics, config,
SystemClock.instance(), loggedStoreBaseDirectory,
+ new StorageManagerUtil(), blobStoreManager);
+
+ // create checkpoint dir as a copy of store dir
+ createCheckpointDirFromStoreDirCopy(taskName,
containerModel.getTasks().get(taskName),
+ loggedStoreBaseDirectory, storesToBackup, newCheckpointId);
+ // upload to blob store and return future
+ return blobStoreBackupManager.upload(newCheckpointId, new HashMap<>());
+ }
+
+ private static Checkpoint writeNewCheckpoint(TaskName taskName, CheckpointId
checkpointId,
+ Map<String, String> uploadSCMs, CheckpointManager checkpointManager) {
+ CheckpointV2 oldCheckpoint = (CheckpointV2)
checkpointManager.readLastCheckpoint(taskName);
+ Map<SystemStreamPartition, String> inputOffsets =
oldCheckpoint.getOffsets();
+
+ ImmutableMap.Builder<String, Map<String, String>> newSCMBuilder =
ImmutableMap.builder();
+ newSCMBuilder.put(BlobStoreStateBackendFactory.class.getName(),
uploadSCMs);
+
+ Map<String, String> oldKafkaChangelogStateCheckpointMarkers =
+
oldCheckpoint.getStateCheckpointMarkers().get(KafkaChangelogStateBackendFactory.class.getName());
+ if (oldKafkaChangelogStateCheckpointMarkers != null) {
+ newSCMBuilder.put(KafkaChangelogStateBackendFactory.class.getName(),
oldKafkaChangelogStateCheckpointMarkers);
+ }
+
+ CheckpointV2 checkpointV2 = new CheckpointV2(checkpointId, inputOffsets,
newSCMBuilder.build());
+ checkpointManager.writeCheckpoint(taskName, checkpointV2);
+ return checkpointV2;
+ }
+
+ private static void createCheckpointDirFromStoreDirCopy(TaskName taskName,
TaskModel taskModel,
+ File loggedStoreBaseDir, Set<String> storeName, CheckpointId
checkpointId) {
+ StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+ for (String store : storeName) {
+ try {
+ File storeDirectory =
+ storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, store,
taskName, taskModel.getTaskMode());
+ File checkpointDir = new
File(storageManagerUtil.getStoreCheckpointDir(storeDirectory, checkpointId));
+ FileUtils.copyDirectory(storeDirectory, checkpointDir);
+ } catch (IOException exception) {
+ String msg = String.format("Unable to create a copy of store directory
%s into checkpoint dir %s while "
+ + "attempting to recover from DeletedException", store,
checkpointId);
+ throw new SamzaException(msg, exception);
+ }
+ }
+ }
+
+ private static BlobStoreManager getBlobStoreManager(Config config,
ExecutorService executor) {
+ BlobStoreConfig blobStoreConfig = new BlobStoreConfig(config);
+ String blobStoreManagerFactory =
blobStoreConfig.getBlobStoreManagerFactory();
+ BlobStoreManagerFactory factory =
ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
+ return factory.getRestoreBlobStoreManager(config, executor);
+ }
+
+ private static void updateRestoreTime(long startTime, SamzaContainerMetrics
samzaContainerMetrics,
+ TaskName taskInstance) {
+ long timeToRestore = System.currentTimeMillis() - startTime;
+ if (samzaContainerMetrics != null) {
+ Gauge taskGauge =
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance,
null);
+
+ if (taskGauge != null) {
+ taskGauge.set(timeToRestore);
+ }
+ }
+ }
+
+ private static Boolean isUnwrappedExceptionDeletedException(Throwable ex) {
+ Throwable unwrappedException =
FutureUtil.unwrapExceptions(CompletionException.class,
+ FutureUtil.unwrapExceptions(SamzaException.class, ex));
+ return unwrappedException instanceof DeletedException;
+ }
+}
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index d9c9cd411..a22155261 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -57,6 +57,7 @@ import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class ContainerStorageManagerUtil {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageManagerUtil.class);
@@ -316,7 +317,7 @@ public class ContainerStorageManagerUtil {
/**
* Returns a map of backend factory names to subset of provided storeNames
that should be restored using it.
* For CheckpointV1, only includes stores that should be restored using a
configured changelog.
- * For CheckpointV2, associates stores with the highest precedence
configured restore factory that has a SCM in
+ * For CheckpointV2, associates stores with the highest precedence
configured restore factory that has SCM in
* the checkpoint, or the highest precedence restore factory configured if
there are no SCMs in the checkpoint.
*/
public static Map<String, Set<String>> getBackendFactoryStoreNames(
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
index ea00433c3..e766a351d 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
@@ -86,7 +86,7 @@ public class TestTaskStorageCommitManager {
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
- cm.init();
+ cm.init(checkpoint);
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
}
@@ -105,7 +105,7 @@ public class TestTaskStorageCommitManager {
TaskStorageCommitManager cm = new TaskStorageCommitManager(task,
backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), null, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
- cm.init();
+ cm.init(null);
verify(taskBackupManager1).init(eq(null));
verify(taskBackupManager2).init(eq(null));
}
@@ -130,7 +130,7 @@ public class TestTaskStorageCommitManager {
Collections.emptyMap(), new Partition(1), checkpointManager, new
MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
- cm.init();
+ cm.init(checkpoint);
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
@@ -220,7 +220,7 @@ public class TestTaskStorageCommitManager {
Collections.emptyMap(), new Partition(1), checkpointManager, new
MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
- cm.init();
+ cm.init(checkpoint);
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
@@ -243,7 +243,7 @@ public class TestTaskStorageCommitManager {
.thenReturn(CompletableFuture.completedFuture(factory2Checkpoints));
when(mockLIStore.checkpoint(newCheckpointId)).thenReturn(Optional.empty());
- cm.init();
+ cm.init(checkpoint);
cm.snapshot(newCheckpointId);
// Assert stores where flushed
@@ -295,7 +295,7 @@ public class TestTaskStorageCommitManager {
when(containerStorageManager.getAllStores(taskName)).thenReturn(storageEngines);
CheckpointId newCheckpointId = CheckpointId.create();
- cm.init();
+ cm.init(null);
cm.snapshot(newCheckpointId);
// Assert stores where flushed
@@ -457,7 +457,7 @@ public class TestTaskStorageCommitManager {
changelogSSP, kafkaChangelogSSPOffset.toString()
);
- commitManager.init();
+ commitManager.init(null);
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new
CheckpointV1(offsetsJava));
@@ -564,7 +564,7 @@ public class TestTaskStorageCommitManager {
);
CheckpointV2 checkpoint = new CheckpointV2(newCheckpointId,
Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
- commitManager.init();
+ commitManager.init(null);
// invoke persist to file system
commitManager.writeCheckpointToStoreDirectories(checkpoint);
// Validate only durable and persisted stores are persisted
@@ -634,7 +634,7 @@ public class TestTaskStorageCommitManager {
changelogSSP, kafkaChangelogSSPOffset.toString()
);
- commitManager.init();
+ commitManager.init(null);
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new
CheckpointV1(offsetsJava));
@@ -722,7 +722,7 @@ public class TestTaskStorageCommitManager {
changelogSSP, kafkaChangelogSSPOffset.toString()
);
- commitManager.init();
+ commitManager.init(null);
// invoke persist to file system for v1 checkpoint
commitManager.writeCheckpointToStoreDirectories(new
CheckpointV1(offsetsJava));
@@ -824,7 +824,7 @@ public class TestTaskStorageCommitManager {
changelogSSP, kafkaChangelogSSPOffset.toString()
);
- commitManager.init();
+ commitManager.init(null);
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new
CheckpointV1(offsetsJava));
assertTrue(mockFileSystem.isEmpty());
@@ -870,7 +870,7 @@ public class TestTaskStorageCommitManager {
CheckpointV2 checkpoint = new CheckpointV2(CheckpointId.create(),
Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
doThrow(IOException.class).when(storageManagerUtil).writeCheckpointV2File(eq(tmpTestPath),
eq(checkpoint));
- commitManager.init();
+ commitManager.init(null);
// Should throw samza exception since writeCheckpointV2 failed
commitManager.writeCheckpointToStoreDirectories(checkpoint);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 7a0cdc0a3..4f35521dd 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -133,7 +133,13 @@ public class TestBlobStoreBackupManager {
// Mock - return snapshot index for blob id from test blob store map
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
- when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class)))
+ when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class),
anyBoolean()))
+ .then((Answer<CompletableFuture<SnapshotIndex>>) invocation -> {
+ String blobId = invocation.getArgumentAt(0, String.class);
+ return CompletableFuture.completedFuture(testBlobStore.get(blobId));
+ });
+
+ when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class),
anyBoolean()))
.then((Answer<CompletableFuture<SnapshotIndex>>) invocation -> {
String blobId = invocation.getArgumentAt(0, String.class);
return CompletableFuture.completedFuture(testBlobStore.get(blobId));
@@ -163,7 +169,8 @@ public class TestBlobStoreBackupManager {
// verify delete snapshot index blob called from init 0 times because
prevSnapshotMap returned from init is empty
// in case of null checkpoint.
verify(blobStoreUtil, times(0)).deleteSnapshotIndexBlob(anyString(),
any(Metadata.class));
- when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(), any(Checkpoint.class),
anySetOf(String.class))).thenCallRealMethod();
+ when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(), any(Checkpoint.class),
+ anySetOf(String.class), anyBoolean())).thenCallRealMethod();
// init called with Checkpoint V1 -> unsupported
Checkpoint checkpoint = new CheckpointV1(new HashMap<>());
@@ -292,7 +299,10 @@ public class TestBlobStoreBackupManager {
Checkpoint checkpoint =
new CheckpointV2(checkpointId, new HashMap<>(),
ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(),
previousCheckpoints));
- when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(), any(Checkpoint.class),
anySetOf(String.class))).thenCallRealMethod();
+ when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(), any(Checkpoint.class),
+ anySetOf(String.class), anyBoolean())).thenCallRealMethod();
+ when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(), any(Checkpoint.class),
+ anySetOf(String.class), anyBoolean())).thenCallRealMethod();
blobStoreBackupManager.init(checkpoint);
// mock: set task store dir to return corresponding test local store and
create checkpoint dir
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index ddc0c8e19..67d62759e 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -88,16 +88,13 @@ public class TestBlobStoreRestoreManager {
when(mockSnapshotIndex.getDirIndex()).thenReturn(dirIndex);
BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class);
- when(blobStoreUtil.cleanUpDir(any(DirIndex.class),
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
- when(blobStoreUtil.deleteDir(any(DirIndex.class),
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
- when(blobStoreUtil.deleteSnapshotIndexBlob(anyString(),
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
+ when(blobStoreUtil.cleanSnapshotIndex(anyString(),
any(SnapshotIndex.class), any(Metadata.class)))
+ .thenReturn(CompletableFuture.completedFuture(null));
BlobStoreRestoreManager.deleteUnusedStoresFromBlobStore(
jobName, jobId, taskName, storageConfig, blobStoreConfig,
initialStoreSnapshotIndexes, blobStoreUtil, EXECUTOR);
- verify(blobStoreUtil, times(1)).cleanUpDir(eq(dirIndex),
any(Metadata.class));
- verify(blobStoreUtil, times(1)).deleteDir(eq(dirIndex),
any(Metadata.class));
- verify(blobStoreUtil, times(1)).deleteSnapshotIndexBlob(eq(blobId),
any(Metadata.class));
+ verify(blobStoreUtil, times(1)).cleanSnapshotIndex(eq(blobId),
any(SnapshotIndex.class), any(Metadata.class));
}
@@ -201,16 +198,16 @@ public class TestBlobStoreRestoreManager {
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
// return immediately without restoring.
- when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class)))
+ when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
// verify that the store directory restore was called and skipped (i.e.
shouldRestore == true)
- verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class));
+ verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
// verify that the store directory was deleted prior to restore
// (should still not exist at the end since restore is no-op)
assertFalse(storeDir.toFile().exists());
@@ -254,15 +251,15 @@ public class TestBlobStoreRestoreManager {
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
// return immediately without restoring.
- when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class)))
+ when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
// verify that the store directory restore was called and skipped (i.e.
shouldRestore == true)
- verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class));
+ verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
// verify that the checkpoint directories were deleted prior to restore
(should not exist at the end)
assertFalse(storeCheckpointDir1.toFile().exists());
assertFalse(storeCheckpointDir2.toFile().exists());
@@ -310,15 +307,15 @@ public class TestBlobStoreRestoreManager {
// ensures shouldRestore is not called
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
// return immediately without restoring.
- when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class)))
+ when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
// verify that the store directory restore was not called (should have
restored from checkpoint dir)
- verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class));
+ verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
// verify that the checkpoint dir was renamed to store dir
assertFalse(storeCheckpointDir.toFile().exists());
assertTrue(storeDir.toFile().exists());
@@ -352,11 +349,11 @@ public class TestBlobStoreRestoreManager {
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
// verify that we checked the previously checkpointed SCMs.
verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName"));
// verify that the store directory restore was never called
- verify(blobStoreUtil, times(0)).restoreDir(any(File.class),
any(DirIndex.class), any(Metadata.class));
+ verify(blobStoreUtil, times(0)).restoreDir(any(File.class),
any(DirIndex.class), any(Metadata.class), anyBoolean());
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index 37512eaf3..5b79315b6 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -610,7 +610,7 @@ public class TestBlobStoreUtil {
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
- when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class))).thenAnswer(
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenAnswer(
(Answer<CompletionStage<Void>>) invocationOnMock -> {
String blobId = invocationOnMock.getArgumentAt(0, String.class);
OutputStream outputStream = invocationOnMock.getArgumentAt(1,
OutputStream.class);
@@ -623,7 +623,7 @@ public class TestBlobStoreUtil {
});
BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
- blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata).join();
+ blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
assertTrue(
new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), mockDirIndex));
@@ -663,7 +663,7 @@ public class TestBlobStoreUtil {
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
- when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class))).thenAnswer(
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenAnswer(
(Answer<CompletionStage<Void>>) invocationOnMock -> { // first try,
retriable error
String blobId = invocationOnMock.getArgumentAt(0, String.class);
OutputStream outputStream = invocationOnMock.getArgumentAt(1,
OutputStream.class);
@@ -681,7 +681,7 @@ public class TestBlobStoreUtil {
});
BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
- blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata).join();
+ blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
assertTrue(
new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), mockDirIndex));
@@ -721,7 +721,7 @@ public class TestBlobStoreUtil {
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
- when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class))).thenReturn(
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenReturn(
FutureUtil.failedFuture(new IllegalArgumentException())) // non
retriable error
.thenAnswer((Answer<CompletionStage<Void>>) invocationOnMock -> {
String blobId = invocationOnMock.getArgumentAt(0, String.class);
@@ -735,7 +735,7 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
try {
- blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata).join();
+ blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
fail("Should have failed on non-retriable errors during file restore");
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
@@ -750,7 +750,7 @@ public class TestBlobStoreUtil {
String localSnapshotFiles = "[a, b, z/1, y/1, p/m/1, q/n/1]";
Path localSnapshot = BlobStoreTestUtil.createLocalDir(localSnapshotFiles);
BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
- when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class))).thenAnswer(
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenAnswer(
(Answer<CompletionStage<Void>>) invocationOnMock -> {
String blobId = invocationOnMock.getArgumentAt(0, String.class);
OutputStream outputStream = invocationOnMock.getArgumentAt(1,
OutputStream.class);
@@ -775,7 +775,7 @@ public class TestBlobStoreUtil {
DirIndex dirIndex = BlobStoreTestUtil.createDirIndex(prevSnapshotFiles);
BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
- when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class))).thenAnswer(
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenAnswer(
(Answer<CompletionStage<Void>>) invocationOnMock -> {
String blobId = invocationOnMock.getArgumentAt(0, String.class);
OutputStream outputStream = invocationOnMock.getArgumentAt(1,
OutputStream.class);
@@ -786,7 +786,7 @@ public class TestBlobStoreUtil {
Path restoreDirBasePath =
Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX);
BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
- blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex,
metadata).join();
+ blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata,
false).join();
assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), dirIndex));
}
@@ -800,7 +800,8 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(mock(BlobStoreManager.class),
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", null, new HashSet<>());
+ blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ null, new HashSet<>(), false);
assertTrue(snapshotIndexes.isEmpty());
}
@@ -810,7 +811,8 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(mock(BlobStoreManager.class),
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
Map<String, Pair<String, SnapshotIndex>> prevSnapshotIndexes =
- blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", mockCheckpoint, new HashSet<>());
+ blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ mockCheckpoint, new HashSet<>(), false);
assertEquals(prevSnapshotIndexes.size(), 0);
}
@@ -824,7 +826,8 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(mock(BlobStoreManager.class),
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", mockCheckpoint, new HashSet<>());
+ blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ mockCheckpoint, new HashSet<>(), false);
assertTrue(snapshotIndexes.isEmpty());
}
@@ -838,7 +841,8 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(mock(BlobStoreManager.class),
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", mockCheckpoint, new HashSet<>());
+ blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ mockCheckpoint, new HashSet<>(), false);
assertTrue(snapshotIndexes.isEmpty());
}
@@ -849,36 +853,11 @@ public class TestBlobStoreUtil {
Set<String> storesToBackupOrRestore = new HashSet<>();
storesToBackupOrRestore.add("storeName");
BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
- when(mockBlobStoreUtil.getSnapshotIndex(anyString(),
any(Metadata.class))).thenThrow(new RuntimeException());
+ when(mockBlobStoreUtil.getSnapshotIndex(anyString(), any(Metadata.class),
anyBoolean())).thenThrow(new RuntimeException());
when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(),
- any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
- mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", checkpoint, storesToBackupOrRestore);
- }
-
- @Test
- public void testGetSSISkipsStoresWithSnapshotIndexAlreadyDeleted() {
- String store = "storeName1";
- String otherStore = "storeName2";
- Checkpoint checkpoint =
createCheckpointV2(BlobStoreStateBackendFactory.class.getName(),
- ImmutableMap.of(store, "snapshotIndexBlobId1", otherStore,
"snapshotIndexBlobId2"));
- Set<String> storesToBackupOrRestore = new HashSet<>();
- storesToBackupOrRestore.add(store);
- storesToBackupOrRestore.add(otherStore);
- SnapshotIndex store1SnapshotIndex = mock(SnapshotIndex.class);
- BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
-
- CompletableFuture<SnapshotIndex> failedFuture =
FutureUtil.failedFuture(new DeletedException());
- when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"),
any(Metadata.class))).thenReturn(
- CompletableFuture.completedFuture(store1SnapshotIndex));
- when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"),
any(Metadata.class))).thenReturn(failedFuture);
- when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(),
- any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
-
- Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", checkpoint, storesToBackupOrRestore);
- assertEquals(1, snapshotIndexes.size());
- assertEquals("snapshotIndexBlobId1",
snapshotIndexes.get("storeName1").getLeft());
- assertEquals(store1SnapshotIndex,
snapshotIndexes.get("storeName1").getRight());
+ any(Checkpoint.class), anySetOf(String.class),
any(Boolean.class))).thenCallRealMethod();
+ mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ checkpoint, storesToBackupOrRestore, false);
}
@Test
@@ -893,15 +872,16 @@ public class TestBlobStoreUtil {
SnapshotIndex store1SnapshotIndex = mock(SnapshotIndex.class);
BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(),
- any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+ any(Checkpoint.class), anySetOf(String.class),
anyBoolean())).thenCallRealMethod();
RuntimeException nonIgnoredException = new RuntimeException();
CompletableFuture<SnapshotIndex> failedFuture =
FutureUtil.failedFuture(nonIgnoredException);
- when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"),
any(Metadata.class))).thenReturn(
+ when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"),
any(Metadata.class), anyBoolean())).thenReturn(
FutureUtil.failedFuture(new DeletedException())); // should fail even
if some errors are ignored
- when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"),
any(Metadata.class))).thenReturn(failedFuture);
+ when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"),
any(Metadata.class), anyBoolean())).thenReturn(failedFuture);
try {
- mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", checkpoint, storesToBackupOrRestore);
+ mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ checkpoint, storesToBackupOrRestore, false);
fail("Should have thrown an exception");
} catch (Exception e) {
Throwable cause =
@@ -925,21 +905,22 @@ public class TestBlobStoreUtil {
BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
- when(mockBlobStoreUtil.getSnapshotIndex(eq(storeSnapshotIndexBlobId),
any(Metadata.class))).thenReturn(
+ when(mockBlobStoreUtil.getSnapshotIndex(eq(storeSnapshotIndexBlobId),
any(Metadata.class), any(Boolean.class))).thenReturn(
CompletableFuture.completedFuture(mockStoreSnapshotIndex));
- when(mockBlobStoreUtil.getSnapshotIndex(eq(otherStoreSnapshotIndexBlobId),
any(Metadata.class))).thenReturn(
+ when(mockBlobStoreUtil.getSnapshotIndex(eq(otherStoreSnapshotIndexBlobId),
any(Metadata.class), any(Boolean.class))).thenReturn(
CompletableFuture.completedFuture(mockOtherStooreSnapshotIndex));
when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(),
- any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+ any(Checkpoint.class), anySetOf(String.class),
any(Boolean.class))).thenCallRealMethod();
Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", checkpoint, storesToBackupOrRestore);
+ mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ checkpoint, storesToBackupOrRestore, false);
assertEquals(storeSnapshotIndexBlobId,
snapshotIndexes.get(storeName).getKey());
assertEquals(mockStoreSnapshotIndex,
snapshotIndexes.get(storeName).getValue());
assertEquals(otherStoreSnapshotIndexBlobId,
snapshotIndexes.get(otherStoreName).getKey());
assertEquals(mockOtherStooreSnapshotIndex,
snapshotIndexes.get(otherStoreName).getValue());
- verify(mockBlobStoreUtil, times(2)).getSnapshotIndex(anyString(),
any(Metadata.class));
+ verify(mockBlobStoreUtil, times(2)).getSnapshotIndex(anyString(),
any(Metadata.class), any(Boolean.class));
}
@Test
@@ -952,15 +933,17 @@ public class TestBlobStoreUtil {
CheckpointV2 checkpoint =
createCheckpointV2(BlobStoreStateBackendFactory.class.getName(),
ImmutableMap.of(store, "1", anotherStore, "2", oneMoreStore, "3"));
BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
- when(mockBlobStoreUtil.getSnapshotIndex(any(String.class),
any(Metadata.class)))
+ when(mockBlobStoreUtil.getSnapshotIndex(any(String.class),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(mockStoreSnapshotIndex));
when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(),
anyString(),
- any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+ any(Checkpoint.class), anySetOf(String.class),
anyBoolean())).thenCallRealMethod();
Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
- mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName", checkpoint, storesToBackupOrRestore);
+ mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId",
"taskName",
+ checkpoint, storesToBackupOrRestore, false);
- verify(mockBlobStoreUtil,
times(storesToBackupOrRestore.size())).getSnapshotIndex(anyString(),
any(Metadata.class));
+ verify(mockBlobStoreUtil, times(storesToBackupOrRestore.size()))
+ .getSnapshotIndex(anyString(), any(Metadata.class), anyBoolean());
}
/**
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 6b2154e00..d09ad0758 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -25,6 +25,7 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import javax.servlet.http.{HttpServlet, HttpServletRequest,
HttpServletResponse}
import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig}
import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
import org.apache.samza.coordinator.JobModelManager
@@ -90,7 +91,7 @@ class TestSamzaContainer extends AssertionsForJUnit with
MockitoSugar {
@Test
def testExceptionInTaskInitShutsDownTask() {
- when(this.taskInstance.initTask).thenThrow(new RuntimeException("Trigger a
shutdown, please."))
+ when(this.taskInstance.initTask(any[Option[Checkpoint]])).thenThrow(new
RuntimeException("Trigger a shutdown, please."))
this.samzaContainer.run
@@ -105,7 +106,7 @@ class TestSamzaContainer extends AssertionsForJUnit with
MockitoSugar {
@Test
def testErrorInTaskInitShutsDownTask(): Unit = {
- when(this.taskInstance.initTask).thenThrow(new NoSuchMethodError("Trigger
a shutdown, please."))
+ when(this.taskInstance.initTask(any[Option[Checkpoint]])).thenThrow(new
NoSuchMethodError("Trigger a shutdown, please."))
this.samzaContainer.run
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 08b082ca2..6afec52e7 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -186,7 +186,7 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
@Test
def testInitTask(): Unit = {
- this.taskInstance.initTask
+ this.taskInstance.initTask(Some(mock[Checkpoint]))
val contextCaptor = ArgumentCaptor.forClass(classOf[Context])
verify(this.task).init(contextCaptor.capture())
@@ -221,7 +221,7 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
null
}
})
- taskInstance.initTask
+ taskInstance.initTask(Some(mock[Checkpoint]))
verify(this.offsetManager).setStartingOffset(TASK_NAME,
SYSTEM_STREAM_PARTITION, "10")
}
@@ -1011,7 +1011,7 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
@Test
def testNoApplicationTaskContextFactoryProvided() {
setupTaskInstance(None)
- this.taskInstance.initTask
+ this.taskInstance.initTask(Some(mock[Checkpoint]))
this.taskInstance.shutdownTask
verifyZeroInteractions(this.applicationTaskContext)
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 15785644e..2b2b51885 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -21,7 +21,9 @@ package org.apache.samza.storage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -30,8 +32,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
@@ -46,11 +53,23 @@ import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+import org.apache.samza.storage.blobstore.BlobStoreRestoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
@@ -60,19 +79,26 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import scala.collection.JavaConverters;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ReflectionUtil.class,
ContainerStorageManagerRestoreUtil.class})
public class TestContainerStorageManager {
private static final String STORE_NAME = "store";
@@ -247,6 +273,9 @@ public class TestContainerStorageManager {
return CompletableFuture.completedFuture(null);
}).when(restoreManager).restore();
+ Map<TaskName, TaskInstanceCollector> taskInstanceCollectors = new
HashMap<>();
+ tasks.keySet().forEach(taskName -> taskInstanceCollectors.put(taskName,
mock(TaskInstanceCollector.class)));
+
// Create the container storage manager
this.containerStorageManager = new ContainerStorageManager(
checkpointManager,
@@ -264,7 +293,7 @@ public class TestContainerStorageManager {
mock(JobContext.class),
mockContainerContext,
ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY,
backendFactory),
- mock(Map.class),
+ taskInstanceCollectors,
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
null,
@@ -502,6 +531,173 @@ public class TestContainerStorageManager {
factoriesToStores.get("factory2"));
}
+ @Test
+ public void testInitRecoversFromDeletedException() {
+ TaskName taskName = new TaskName("task");
+ Set<String> stores = Collections.singleton("store");
+
+ BlobStoreRestoreManager taskRestoreManager =
mock(BlobStoreRestoreManager.class);
+ Throwable deletedException = new SamzaException(new
CompletionException(new DeletedException("410 gone")));
+
doThrow(deletedException).when(taskRestoreManager).init(any(Checkpoint.class));
+
when(taskRestoreManager.restore()).thenReturn(CompletableFuture.completedFuture(null));
+
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
+
+ // mock ReflectionUtil.getObj
+ PowerMockito.mockStatic(ReflectionUtil.class);
+ BlobStoreManagerFactory blobStoreManagerFactory =
mock(BlobStoreManagerFactory.class);
+ BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+ PowerMockito.when(ReflectionUtil.getObj(anyString(),
eq(BlobStoreManagerFactory.class)))
+ .thenReturn(blobStoreManagerFactory);
+ when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class),
any(ExecutorService.class)))
+ .thenReturn(blobStoreManager);
+
+ Map<String, TaskRestoreManager> storeTaskRestoreManager =
ImmutableMap.of("store", taskRestoreManager);
+ CheckpointManager checkpointManager = mock(CheckpointManager.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(jobContext.getJobModel()).thenReturn(mock(JobModel.class));
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(new TaskName("test"));
+ ContainerModel containerModel = mock(ContainerModel.class);
+ when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName,
taskModel));
+ Checkpoint checkpoint = mock(CheckpointV2.class);
+ Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName,
checkpoint);
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
+ ImmutableMap.of(taskName,
ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(), stores));
+ Config config = new MapConfig(ImmutableMap.of("job.name", "test"),
ImmutableMap.of("stores.store.backup.factories",
BlobStoreStateBackendFactory.class.getName()));
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
storeTaskRestoreManager),
+ samzaContainerMetrics, checkpointManager, jobContext, containerModel,
taskCheckpoints,
+ taskBackendFactoryToStoreNames, config, executor, new HashMap<>(),
null,
+ ImmutableMap.of("store", systemConsumer));
+
+ // verify init() is called twice -> once without getDeleted flag, once
with getDeleted flag
+ verify(taskRestoreManager, times(1)).init(any(Checkpoint.class));
+ verify(taskRestoreManager, times(1)).init(any(Checkpoint.class),
anyBoolean());
+ // verify restore is called with getDeletedFlag only
+ verify(taskRestoreManager, times(0)).restore();
+ verify(taskRestoreManager, times(1)).restore(true);
+ }
+
+ @Test
+ public void testRestoreRecoversFromDeletedException() throws Exception {
+ TaskName taskName = new TaskName("task");
+ Set<String> stores = Collections.singleton("store");
+
+ BlobStoreRestoreManager taskRestoreManager =
mock(BlobStoreRestoreManager.class);
+ doNothing().when(taskRestoreManager).init(any(Checkpoint.class));
+
+ CompletableFuture<Void> failedFuture =
CompletableFuture.completedFuture(null)
+ .thenCompose(v -> { throw new DeletedException("410 Gone"); });
+ when(taskRestoreManager.restore()).thenReturn(failedFuture);
+
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
+
+ Map<String, TaskRestoreManager> factoryToTaskRestoreManager =
ImmutableMap.of(
+ BlobStoreStateBackendFactory.class.getName(), taskRestoreManager);
+
+ JobContext jobContext = mock(JobContext.class);
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(taskName);
+ when(taskModel.getTaskMode()).thenReturn(TaskMode.Active);
+
+ ContainerModel containerModel = mock(ContainerModel.class);
+ when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName,
taskModel));
+
+ CheckpointV2 checkpoint = mock(CheckpointV2.class);
+ when(checkpoint.getOffsets()).thenReturn(ImmutableMap.of());
+ when(checkpoint.getCheckpointId()).thenReturn(CheckpointId.create());
+ when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
+ KafkaChangelogStateBackendFactory.class.getName(), new HashMap<>()));
+
+ CheckpointManager checkpointManager = mock(CheckpointManager.class);
+
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
+
+ String expectedOldBlobId = "oldBlobId";
+ when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
+ BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of("store",
expectedOldBlobId)));
+
+ Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName,
checkpoint);
+
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
+ ImmutableMap.of(taskName, ImmutableMap.of(
+ BlobStoreStateBackendFactory.class.getName(), stores));
+
+ Config config = new MapConfig(ImmutableMap.of(
+ "blob.store.manager.factory",
BlobStoreStateBackendFactory.class.getName(),
+ "job.name", "test"));
+
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+
+ SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+ // mock ReflectionUtil.getObj
+ PowerMockito.mockStatic(ReflectionUtil.class);
+ BlobStoreManagerFactory blobStoreManagerFactory =
mock(BlobStoreManagerFactory.class);
+ BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+ PowerMockito.when(ReflectionUtil.getObj(anyString(),
eq(BlobStoreManagerFactory.class)))
+ .thenReturn(blobStoreManagerFactory);
+ when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class),
any(ExecutorService.class)))
+ .thenReturn(blobStoreManager);
+
+ // mock ContainerStorageManagerRestoreUtil.backupRecoveredStore
+ String expectedBlobId = "blobId";
+ PowerMockito.spy(ContainerStorageManagerRestoreUtil.class);
+
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of("store",
expectedBlobId)))
+ .when(ContainerStorageManagerRestoreUtil.class, "backupRecoveredStore",
+ any(JobContext.class), any(ContainerModel.class),
any(Config.class),
+ any(TaskName.class), any(Set.class), any(Checkpoint.class),
any(File.class),
+ any(BlobStoreManager.class), any(MetricsRegistry.class),
any(ExecutorService.class));
+
+ SnapshotIndex snapshotIndex = new SnapshotIndex(System.currentTimeMillis(),
+ new SnapshotMetadata(CheckpointId.create(), "job", "test", "task",
"store"),
+ new DirIndex("test", new ArrayList<>(), new ArrayList<>(), new
ArrayList<>(), new ArrayList<>()),
+ Optional.empty());
+
+ ArgumentCaptor<String> getBlobIdCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<ByteArrayOutputStream> outputStreamCaptor =
ArgumentCaptor.forClass(ByteArrayOutputStream.class);
+ when(blobStoreManager.get(getBlobIdCaptor.capture(),
outputStreamCaptor.capture(),
+ any(Metadata.class), any(Boolean.class)))
+ .thenAnswer(invocation -> {
+ ByteArrayOutputStream outputStream = outputStreamCaptor.getValue();
+ outputStream.write(new SnapshotIndexSerde().toBytes(snapshotIndex));
+ return CompletableFuture.completedFuture(null);
+ });
+
+ ArgumentCaptor<String> removeTTLBlobIdCaptor =
ArgumentCaptor.forClass(String.class);
+ when(blobStoreManager.removeTTL(removeTTLBlobIdCaptor.capture(),
any(Metadata.class)))
+ .thenAnswer(invocation -> CompletableFuture.completedFuture(null));
+
+ ArgumentCaptor<String> deleteBlobIdCaptor =
ArgumentCaptor.forClass(String.class);
+ when(blobStoreManager.delete(deleteBlobIdCaptor.capture(),
any(Metadata.class)))
+ .thenAnswer(invocation -> CompletableFuture.completedFuture(null));
+
+ Map<TaskName, Checkpoint> updatedTaskCheckpoints =
+
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
factoryToTaskRestoreManager),
+ samzaContainerMetrics, checkpointManager, jobContext,
containerModel, taskCheckpoints,
+ taskBackendFactoryToStoreNames, config, executor, new HashMap<>(),
null,
+ ImmutableMap.of("store", systemConsumer)).get();
+
+ // verify taskCheckpoint is updated
+ assertNotEquals(((CheckpointV2)
taskCheckpoints.get(taskName)).getCheckpointId(),
+ ((CheckpointV2)
updatedTaskCheckpoints.get(taskName)).getCheckpointId());
+
+ // verify init is not retried with getDeleted
+ verify(taskRestoreManager, times(0)).init(any(Checkpoint.class),
anyBoolean());
+
+ // verify restore is call twice - once without getDeleted flag, once with
getDeleted flag
+ verify(taskRestoreManager, times(1)).restore();
+ verify(taskRestoreManager, times(1)).restore(true);
+
+ // verify the GET and removeTTL was called on the new SnapshotIndex
+ assertEquals(expectedBlobId, getBlobIdCaptor.getAllValues().get(0));
+ assertEquals(expectedBlobId, removeTTLBlobIdCaptor.getAllValues().get(0));
+
+ // verify that GET and delete was called on the old SnapshotIndex
+ assertEquals(expectedOldBlobId, getBlobIdCaptor.getAllValues().get(1));
+ assertEquals(expectedOldBlobId, deleteBlobIdCaptor.getValue());
+ }
+
@Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
diff --git
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
index 8bb93f769..ad869f0a8 100644
---
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
+++
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
@@ -72,7 +72,7 @@ public class TestTransactionalStateTaskBackupManager {
doReturn(mock(java.util.Map.class)).when(tsm).getNewestChangelogSSPOffsets(any(),
any(), any(), any());
// invoke Kafka flush
- commitManager.init();
+ commitManager.init(null);
commitManager.snapshot(CheckpointId.create());
// ensure that stores are flushed before we get newest changelog offsets
diff --git
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
index 9ab870649..1012646a9 100644
---
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
+++
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
@@ -175,10 +175,12 @@ public class MyStatefulApplication implements
TaskApplication {
public void init(Context context) {
storeNames.forEach(storeName -> {
KeyValueStore<String, String> store = (KeyValueStore<String, String>)
context.getTaskContext().getStore(storeName);
+ LOG.debug("For storename:{} received: {}", storeName, store);
stores.add(store); // any input messages will be written to all
'stores'
KeyValueIterator<String, String> storeEntries = store.all();
List<String> storeInitialContents = new ArrayList<>();
while (storeEntries.hasNext()) {
+ LOG.debug("INIT {} Store content. StoreInitialContent: {}",
storeName, storeInitialContents);
storeInitialContents.add(storeEntries.next().getValue());
}
initialStoreContents.put(storeName, storeInitialContents);
@@ -188,11 +190,14 @@ public class MyStatefulApplication implements
TaskApplication {
inMemoryStoreNames.forEach(storeName -> {
KeyValueStore<String, String> store =
(KeyValueStore<String, String>)
context.getTaskContext().getStore(storeName);
+ LOG.debug("For storename:{} received: {}", storeName, store);
stores.add(store); // any input messages will be written to all
'stores'.
KeyValueIterator<String, String> storeEntries = store.all();
List<String> storeInitialContents = new ArrayList<>();
+ LOG.debug("INIT InMemory Store content:{} ", storeName);
while (storeEntries.hasNext()) {
storeInitialContents.add(storeEntries.next().getValue());
+ LOG.debug("INIT InMemory Store content. StoreInitialContent: {}",
storeInitialContents);
}
initialInMemoryStoreContents.put(storeName, storeInitialContents);
storeEntries.close();
@@ -201,6 +206,7 @@ public class MyStatefulApplication implements
TaskApplication {
if (sideInputStoreName.isPresent()) {
KeyValueStore<String, String> sideInputStore =
(KeyValueStore<String, String>)
context.getTaskContext().getStore(sideInputStoreName.get());
+ LOG.debug("For storename:{} received: {}", sideInputStoreName,
sideInputStore);
KeyValueIterator<String, String> sideInputStoreEntries =
sideInputStore.all();
List<String> sideInputStoreInitialContents = new ArrayList<>();
while (sideInputStoreEntries.hasNext()) {
@@ -219,25 +225,38 @@ public class MyStatefulApplication implements
TaskApplication {
if (key.endsWith("crash_once")) { // endsWith allows :crash_once and
crash_once
if (!crashedOnce) {
+ LOG.debug("Process in my task: CrashOnce received.");
crashedOnce = true;
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
} else {
return;
}
} else if (key.endsWith("shutdown")) {
+ LOG.debug("Process in my task: Shutdown received.");
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
} else if (key.startsWith("-")) {
- stores.forEach(store -> store.delete(key.substring(1)));
+ stores.forEach(store -> {
+ LOG.debug("Process in my task: - received. Deleting: {} from {}",
key, store);
+ store.delete(key.substring(1));
+ });
} else if (key.startsWith(":")) {
// write the message and flush, but don't invoke commit later
String msg = key.substring(1);
- stores.forEach(store -> store.put(msg, msg));
+ stores.forEach(store -> {
+ LOG.debug("Process in my task: ':' received. Put {} in {}", msg,
store);
+ store.put(msg, msg);
+ });
} else {
- stores.forEach(store -> store.put(key, key));
+ stores.forEach(store -> {
+ LOG.debug("Process in my task: Adding key to store. Put {} in {}",
key, store);
+ store.put(key, key);
+ });
}
+ LOG.debug("Process in my task: Flush received.");
stores.forEach(KeyValueStore::flush);
if (!key.startsWith(":")) {
+ LOG.debug("Process in my task: ':' not received. Calling commit: {}",
TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
index 1779ac735..e79d1b86e 100644
---
a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -51,15 +51,21 @@ import org.apache.samza.storage.MyStatefulApplication;
import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.FileBlob;
+import org.apache.samza.storage.blobstore.index.FileIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.test.util.TestBlobStoreManager;
import org.apache.samza.util.FileUtil;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -124,6 +130,8 @@ public class BlobStoreStateBackendIntegrationTest extends
BaseStateBackendIntegr
private final boolean hostAffinity;
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreStateBackendIntegrationTest.class);
+
public BlobStoreStateBackendIntegrationTest(boolean hostAffinity) {
this.hostAffinity = hostAffinity;
}
@@ -131,6 +139,7 @@ public class BlobStoreStateBackendIntegrationTest extends
BaseStateBackendIntegr
@Before
@Override
public void setUp() {
+ LOG.debug("Starting setup");
super.setUp();
// reset static state shared with task between each parameterized iteration
MyStatefulApplication.resetTestState();
@@ -138,7 +147,15 @@ public class BlobStoreStateBackendIntegrationTest extends
BaseStateBackendIntegr
fileUtil.rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store
on startup
// no need to clear ledger dir since subdir of blob store base dir
fileUtil.rm(new File(BLOB_STORE_BASE_DIR)); // always clear local "blob
store" on startup
+ }
+ @After
+ @Override
+ public void tearDown() {
+ FileUtil fileUtil = new FileUtil();
+ fileUtil.rm(new File(LOGGED_STORE_BASE_DIR));
+ fileUtil.rm(new File(BLOB_STORE_BASE_DIR));
+ LOG.debug("Tear down complete");
}
@Test
@@ -197,6 +214,124 @@ public class BlobStoreStateBackendIntegrationTest extends
BaseStateBackendIntegr
verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot),
hostAffinity, true, true);
}
+ @Test
+ public void stopDeleteBlobRun() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2",
"97", "-97", ":98", ":99", ":crash_once");
+ List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3",
"4", "5", "6");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ sideInputMessagesOnInitialRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ CONFIGS);
+
+ Pair<String, SnapshotIndex> lastRegularSnapshot =
+ verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity,
false, false);
+ Pair<String, SnapshotIndex> lastSideInputSnapshot =
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity,
true,
+ false /* no side input offsets file will be present during initial
restore */);
+
+ // verifies transactional state too
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5",
":shutdown");
+ List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8",
"9");
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1",
"2", "3");
+ // verifies that in-memory stores backed by changelogs work correctly
+ // (requires overriding store level state backends explicitly)
+ List<String> expectedInitialInMemoryStoreContentsOnSecondRun =
Arrays.asList("1", "2", "3");
+ List<String> expectedInitialSideInputStoreContentsOnSecondRun = new
ArrayList<>(sideInputMessagesOnInitialRun);
+
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+
+ deleteBlobFromLastCheckpoint(lastRegularSnapshot.getRight());
+
+ secondRun(
+ hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ sideInputMessagesBeforeSecondRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ expectedInitialStoreContentsOnSecondRun,
+ expectedInitialInMemoryStoreContentsOnSecondRun,
+ expectedInitialSideInputStoreContentsOnSecondRun,
+ CONFIGS);
+
+ verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot),
hostAffinity, false, false);
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot),
hostAffinity, true, true);
+ }
+
+ @Test
+ public void stopDeleteSnapshotIndexBlobRun() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2",
"97", "-97", ":98", ":99", ":crash_once");
+ List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3",
"4", "5", "6");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ sideInputMessagesOnInitialRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ CONFIGS);
+
+ Pair<String, SnapshotIndex> lastRegularSnapshot =
+ verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity,
false, false);
+ Pair<String, SnapshotIndex> lastSideInputSnapshot =
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity,
true,
+ false /* no side input offsets file will be present during initial
restore */);
+
+ // verifies transactional state too
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5",
":shutdown");
+ List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8",
"9");
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1",
"2", "3");
+ // verifies that in-memory stores backed by changelogs work correctly
+ // (requires overriding store level state backends explicitly)
+ List<String> expectedInitialInMemoryStoreContentsOnSecondRun =
Arrays.asList("1", "2", "3");
+ List<String> expectedInitialSideInputStoreContentsOnSecondRun = new
ArrayList<>(sideInputMessagesOnInitialRun);
+
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+
+ deleteLastSnapshotIndex(lastRegularSnapshot);
+
+ secondRun(
+ hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ sideInputMessagesBeforeSecondRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ expectedInitialStoreContentsOnSecondRun,
+ expectedInitialInMemoryStoreContentsOnSecondRun,
+ expectedInitialSideInputStoreContentsOnSecondRun,
+ CONFIGS);
+
+ verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot),
hostAffinity, false, false);
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot),
hostAffinity, true, true);
+ }
+
/**
* Verifies the ledger for TestBlobStoreManager.
* @param startingSnapshot snapshot file name and files present in snapshot
at the beginning of verification (from previous run), if any.
@@ -261,6 +396,53 @@ public class BlobStoreStateBackendIntegrationTest extends
BaseStateBackendIntegr
}
}
+ private void deleteLastSnapshotIndex(Pair<String, SnapshotIndex>
pathSnapshotIndexPair) {
+
deleteCheckpointFromBlobStore(pathSnapshotIndexPair.getRight().getDirIndex());
+ try {
+ LOG.debug("Deleted last SnapshotIndex: {}",
pathSnapshotIndexPair.getLeft());
+ Files.move(Paths.get(pathSnapshotIndexPair.getLeft()),
Paths.get(pathSnapshotIndexPair.getLeft() + "-DELETED")); // Delete the file
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to delete file: " +
pathSnapshotIndexPair.getLeft(), e);
+ }
+ }
+
+ private void deleteBlobFromLastCheckpoint(SnapshotIndex snapshotIndex) {
+ LOG.debug("Deleting SnapshotIndex {} and all associated blobs",
snapshotIndex);
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ // deleted any blob - deleting first blob in the snapshot
+ List<FileIndex> files = dirIndex.getFilesPresent();
+ for (FileIndex file: files) {
+ FileBlob blob = file.getBlobs().get(0);
+ try {
+ Files.move(Paths.get(blob.getBlobId()), Paths.get(blob.getBlobId() +
"-DELETED")); // Delete the file
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to delete file: " +
blob.getBlobId(), e);
+ }
+ }
+ }
+
+ /**
+ * Helper function to recursively delete all files from local blob store
+ * @param dirIndex
+ */
+ private void deleteCheckpointFromBlobStore(DirIndex dirIndex) {
+ for (FileIndex file: dirIndex.getFilesPresent()) {
+ List<FileBlob> blobs = file.getBlobs();
+ for (FileBlob blob: blobs) {
+ String blobPath = blob.getBlobId();
+ try {
+ LOG.debug("Deleting blob: {}", blobPath);
+ Files.move(Paths.get(blobPath), Paths.get(blobPath + "-DELETED"));
// Delete the file
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to delete file: " + blobPath, e);
+ }
+ }
+ }
+ for (DirIndex subdir: dirIndex.getSubDirsPresent()) {
+ deleteCheckpointFromBlobStore(subdir);
+ }
+ }
+
static class MySideInputProcessor implements SideInputsProcessor,
Serializable {
@Override
public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message,
KeyValueStore store) {
diff --git
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
index d247c10e3..76441f388 100644
---
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
+++
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.config.Config;
import org.apache.samza.storage.blobstore.BlobStoreManager;
import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,7 @@ public class TestBlobStoreManager implements BlobStoreManager
{
public static final String LEDGER_FILES_DELETED = "filesRemoved";
public static final String LEDGER_FILES_TTL_UPDATED = "filesTTLUpdated";
+ private final String deletedTombstone = "-DELETED";
private final Path stateLocation;
private final File filesAddedLedger;
private final File filesReadLedger;
@@ -97,24 +100,45 @@ public class TestBlobStoreManager implements
BlobStoreManager {
metadata.getTaskName(), metadata.getStoreName(), suffix);
LOG.info("Creating file at {}", destination);
try {
- FileUtils.writeStringToFile(filesAddedLedger, destination + "\n",
Charset.defaultCharset(), true);
FileUtils.copyInputStreamToFile(inputStream, destination.toFile());
+ FileUtils.writeStringToFile(filesAddedLedger, destination + "\n",
Charset.defaultCharset(), true);
} catch (IOException e) {
+ LOG.error("Error creating file: " + destination);
throw new RuntimeException("Error creating file " + destination, e);
}
return CompletableFuture.completedFuture(destination.toString());
}
@Override
- public CompletionStage<Void> get(String id, OutputStream outputStream,
Metadata metadata) {
+ public CompletionStage<Void> get(String id, OutputStream outputStream,
Metadata metadata, boolean getDeletedBlob) {
LOG.info("Reading file at {}", id);
+ Path filePath = Paths.get(id);
try {
- FileUtils.writeStringToFile(filesReadLedger, id + "\n",
Charset.defaultCharset(), true);
- Path path = Paths.get(id);
- Files.copy(path, outputStream);
+ Files.copy(filePath, outputStream);
outputStream.flush();
+ FileUtils.writeStringToFile(filesReadLedger, id + "\n",
Charset.defaultCharset(), true);
+ } catch (NoSuchFileException noSuchFileException) {
+ // Blob marked for deletion is suffixed with 'DELETED-'. Retrieve
deleted blob if getDeletedBlob is True.
+ Path deletedFilePath = Paths.get(id + deletedTombstone);
+ String msg = String.format("File id: %s was not found. GetDeletedBlob
was set to false", id);
+ if (!Files.exists(deletedFilePath)) {
+ throw new RuntimeException("404: " + msg, noSuchFileException);
+ }
+ if (getDeletedBlob) {
+ try {
+ Files.copy(deletedFilePath, outputStream);
+ outputStream.flush();
+ FileUtils.writeStringToFile(filesReadLedger, id + "\n",
Charset.defaultCharset(), true);
+ LOG.info("Deleted File id {} found - Get deleted was set to true",
id);
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading file with GetDeleted set
to true. File: " + id, e);
+ }
+ } else {
+ LOG.info("File id {} is deleted - Get deleted was set to false", id);
+ throw new DeletedException("410: " + msg, noSuchFileException);
+ }
} catch (IOException e) {
- throw new RuntimeException("Error reading file for id " + id, e);
+ throw new RuntimeException("Error reading file for id " + id + ". Get
deleted was set to " + getDeletedBlob, e);
}
return CompletableFuture.completedFuture(null);
}
@@ -124,7 +148,10 @@ public class TestBlobStoreManager implements
BlobStoreManager {
LOG.info("Deleting file at {}", id);
try {
FileUtils.writeStringToFile(filesDeletedLedger, id + "\n",
Charset.defaultCharset(), true);
- Files.delete(Paths.get(id));
+ // Suffix with 'DELETED' do not actually delete.
+ Files.move(Paths.get(id), Paths.get(id + deletedTombstone));
+ } catch (NoSuchFileException ex) {
+ LOG.warn("File might already be deleted. id: " + id);
} catch (IOException e) {
throw new RuntimeException("Error deleting file for id " + id, e);
}