This is an automated email from the ASF dual-hosted git repository.

ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cf9f9a8d Fix get future exception message (#1712)
0cf9f9a8d is described below

commit 0cf9f9a8d44544170919464357310bc2d41a2e11
Author: ajo thomas <[email protected]>
AuthorDate: Tue Oct 15 14:39:43 2024 -0700

    Fix get future exception message (#1712)
---
 .../storage/blobstore/util/BlobStoreUtil.java      |  2 +-
 .../storage/blobstore/util/TestBlobStoreUtil.java  | 74 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index bd78248cb..3735d6529 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -185,7 +185,7 @@ public class BlobStoreUtil {
           .thenApplyAsync(f -> 
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
           .handle((snapshotIndex, ex) -> {
             if (ex != null) {
-              throw new SamzaException(String.format("Unable to deserialize 
SnapshotIndex bytes for blob ID: %s", blobId), ex);
+              throw new SamzaException(String.format("Unable to get 
SnapshotIndex blob. The blob ID is : %s", blobId), ex);
             }
             return snapshotIndex;
           });
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index a44f86e64..732fd472b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -50,6 +51,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.zip.CRC32;
@@ -78,6 +80,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotIndex;
 import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
 import org.apache.samza.util.FileUtil;
 import org.apache.samza.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -920,6 +923,24 @@ public class TestBlobStoreUtil {
         checkpoint, storesToBackupOrRestore, false);
   }
 
+  @Test
+  public void testSerdeException() throws ExecutionException, 
InterruptedException {
+    final String blobId = "foo";
+
+    final BlobStoreManager testBlobStoreManager = new 
DeserTestBlobStoreManager();
+    final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, 
Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);
+
+    final CompletableFuture<SnapshotIndex> future = 
util.getSnapshotIndex(blobId, mock(Metadata.class), true)
+        .handle((snapshotIndex, throwable) -> {
+          if (throwable != null) {
+            Assert.assertEquals(throwable.getMessage(), String.format("Unable 
to get SnapshotIndex blob. The blob ID is : %s", blobId));
+            Assert.assertEquals(throwable.getCause().getMessage(), 
"org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex 
bytes foobar");
+          }
+          return snapshotIndex;
+        });
+    future.get();
+  }
+
   @Test
   public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
     String store = "storeName1";
@@ -1045,4 +1066,57 @@ public class TestBlobStoreUtil {
     factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
     return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
   }
+
+  /**
+   * Test {@link BlobStoreManager} to be used to assert SnapshotIndex 
deserialization failure
+   * exception message.
+   * We write a dummy string's bytes to the OutputStream parameter of get 
method instead of a SnapshotIndex
+   * blob. The OutputStream is used by SnapshotIndexSerde which will fail 
during deserialization.
+   * */
+  private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
+    @Override
+    public CompletionStage<Void> get(String id, OutputStream outputStream, 
Metadata metadata, boolean getDeletedBlob) {
+      final String randBlob = "foobar";
+      final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
+      try {
+        outputStream.write(byteArray);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return CompletableFuture.completedFuture(null);
+    }
+  }
+
+  /**
+   * Test BlobStoreManager for unit tests.
+   * */
+  private static class TestBlobStoreManager implements BlobStoreManager {
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public CompletionStage<String> put(InputStream inputStream, Metadata 
metadata) {
+      return null;
+    }
+
+    @Override
+    public CompletionStage<Void> get(String id, OutputStream outputStream, 
Metadata metadata, boolean getDeletedBlob) {
+      return null;
+    }
+
+    @Override
+    public CompletionStage<Void> delete(String id, Metadata metadata) {
+      return null;
+    }
+
+    @Override
+    public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
+      return null;
+    }
+
+    @Override
+    public void close() {
+    }
+  }
 }

Reply via email to