This is an automated email from the ASF dual-hosted git repository.

ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d6ffeebbe Log bob deserialization exception for snapshot blobs (#1711)
d6ffeebbe is described below

commit d6ffeebbe4775c0b88c47c0a09fecd3e98505bd8
Author: ajo thomas <[email protected]>
AuthorDate: Wed Oct 9 15:46:09 2024 -0700

    Log bob deserialization exception for snapshot blobs (#1711)
---
 .../org/apache/samza/storage/blobstore/util/BlobStoreUtil.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index 327eb7f33..bd78248cb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -182,7 +182,13 @@ public class BlobStoreUtil {
     return FutureUtil.executeAsyncWithRetries(opName, () -> {
       ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // 
no need to close ByteArrayOutputStream
       return blobStoreManager.get(blobId, indexBlobStream, metadata, 
getDeleted).toCompletableFuture()
-          .thenApplyAsync(f -> 
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor);
+          .thenApplyAsync(f -> 
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
+          .handle((snapshotIndex, ex) -> {
+            if (ex != null) {
+              throw new SamzaException(String.format("Unable to deserialize 
SnapshotIndex bytes for blob ID: %s", blobId), ex);
+            }
+            return snapshotIndex;
+          });
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 

Reply via email to