This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new df6962e  S3 offloader should throw an error on receiving an empty 
ledger (#1855)
df6962e is described below

commit df6962e4ce359a22bcad097ac8cad3b55d975dd2
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue May 29 20:07:51 2018 +0200

    S3 offloader should throw an error on receiving an empty ledger (#1855)
    
    ManagedLedger should never send an empty ledger for offload, as its a
    waste of resources. This patch adds a defensive check to ensure that
    if the S3 offload does get an empty ledger, it doesn't even attempt to
    create any resources on the S3 side.
    
    Master issue: #1511
---
 .../broker/s3offload/S3ManagedLedgerOffloader.java |  5 +++++
 .../s3offload/S3ManagedLedgerOffloaderTest.java    | 25 ++++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 65ffd37..dcfa9e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -114,6 +114,11 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                                            Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(readHandle.getId()).submit(() -> {
+            if (readHandle.getLength() == 0 || !readHandle.isClosed() || 
readHandle.getLastAddConfirmed() < 0) {
+                promise.completeExceptionally(
+                        new IllegalArgumentException("An empty or open ledger 
should never be offloaded"));
+                return;
+            }
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
                 .withLedgerMetadata(readHandle.getLedgerMetadata())
                 
.withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 9f0d253..1b5ad03 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.s3offload;
 import static 
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.dataBlockOffloadKey;
 import static 
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.indexBlockOffloadKey;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
@@ -30,6 +31,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
@@ -422,5 +424,28 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
             Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
+
+    @Test
+    public void testOffloadEmpty() throws Exception {
+        CompletableFuture<LedgerEntries> noEntries = new CompletableFuture<>();
+        noEntries.completeExceptionally(new BKException.BKReadException());
+
+        ReadHandle readHandle = Mockito.mock(ReadHandle.class);
+        Mockito.doReturn(-1L).when(readHandle).getLastAddConfirmed();
+        Mockito.doReturn(noEntries).when(readHandle).readAsync(anyLong(), 
anyLong());
+        Mockito.doReturn(0L).when(readHandle).getLength();
+        Mockito.doReturn(true).when(readHandle).isClosed();
+        Mockito.doReturn(1234L).when(readHandle).getId();
+
+        UUID uuid = UUID.randomUUID();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+                                                                 
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        try {
+            offloader.offload(readHandle, uuid, new HashMap<>()).get();
+            Assert.fail("Shouldn't have been able to offload");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), 
IllegalArgumentException.class);
+        }
+    }
 }
 

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to