This is an automated email from the ASF dual-hosted git repository.
ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d6ffeebbe Log bob deserialization exception for snapshot blobs (#1711)
d6ffeebbe is described below
commit d6ffeebbe4775c0b88c47c0a09fecd3e98505bd8
Author: ajo thomas <[email protected]>
AuthorDate: Wed Oct 9 15:46:09 2024 -0700
Log bob deserialization exception for snapshot blobs (#1711)
---
.../org/apache/samza/storage/blobstore/util/BlobStoreUtil.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index 327eb7f33..bd78248cb 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -182,7 +182,13 @@ public class BlobStoreUtil {
return FutureUtil.executeAsyncWithRetries(opName, () -> {
ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); //
no need to close ByteArrayOutputStream
return blobStoreManager.get(blobId, indexBlobStream, metadata,
getDeleted).toCompletableFuture()
- .thenApplyAsync(f ->
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor);
+ .thenApplyAsync(f ->
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
+ .handle((snapshotIndex, ex) -> {
+ if (ex != null) {
+ throw new SamzaException(String.format("Unable to deserialize
SnapshotIndex bytes for blob ID: %s", blobId), ex);
+ }
+ return snapshotIndex;
+ });
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}