This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5980169   [tiered storage] store driver name and driver specific 
metadata in original ledger metadata (#2398)
5980169 is described below

commit 59801695a9808c4c6574b421f90b0d91a9d712ad
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Aug 20 23:21:53 2018 -0700

     [tiered storage] store driver name and driver specific metadata in 
original ledger metadata (#2398)
    
    
    
     ### Motivation
    
    1) Currently the location of an offloaded ledger isn't stored in the 
original ledger metadata.
    That means if configuration is changed or modified by mistake. We might 
potentially cause data loss.
    
    2) The location of an offloaded ledger is needed by Pulsar SQL. so it is 
very inconvinient to
    have the location information stored in a configuration and the approach is 
also problematic.
    
     ### Changes
    
    Store `driverName` and driver-specific metadata (e.g. bucket name, region 
name, endpoint) in the
    original ledger metadata. Change ManagedLedgerImpl to use the 
driver-specific metadata to read
    the offloaded ledger. If the driver-specific metadata is missed, it will 
fall back to use the configuration.
    
     ### Tests
    
    This change doesn't change the behavior. Existing unit tests and 
integration tests already covered the logic.
    
     ### NOTES
    
    Currently the driver name in metadata is not used. We need to use driver 
name to load different offloader driver
    after #2393 is implemented
---
 .../apache/bookkeeper/mledger/LedgerOffloader.java |  33 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  69 ++++++-
 .../mledger/impl/NullLedgerOffloader.java          |  11 +-
 .../bookkeeper/mledger/offload/OffloadUtils.java   |  91 +++++++++
 managed-ledger/src/main/proto/MLDataFormats.proto  |  11 +
 .../mledger/impl/OffloadPrefixReadTest.java        |  27 ++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  16 +-
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   3 +
 tiered-storage/jcloud/pom.xml                      |   2 +
 .../impl/BlobStoreManagedLedgerOffloader.java      | 221 ++++++++++++++++-----
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  19 +-
 11 files changed, 419 insertions(+), 84 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 6885500..8fc35cc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import com.google.common.annotations.Beta;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,25 @@ import org.apache.bookkeeper.client.api.ReadHandle;
  */
 @Beta
 public interface LedgerOffloader {
+
+    /**
+     * Get offload driver name.
+     *
+     * @return offload driver name.
+     */
+    String getOffloadDriverName();
+
+    /**
+     * Get offload driver metadata.
+     *
+     * <p>The driver metadata will be recorded as part of the metadata of the 
original ledger.
+     *
+     * @return offload driver metadata.
+     */
+    default Map<String, String> getOffloadDriverMetadata() {
+        return Collections.emptyMap();
+    }
+
     /**
      * Offload the passed in ledger to longterm storage.
      * Metadata passed in is for inspection purposes only and should be stored
@@ -51,10 +71,9 @@ public interface LedgerOffloader {
      *
      * @param ledger the ledger to offload
      * @param uid unique id to identity this offload attempt
-     * @param extraMetadata metadata to be stored with the ledger for 
informational
+     * @param extraMetadata metadata to be stored with the offloaded ledger 
for informational
      *                      purposes
-     * @return a future, which when completed, denotes that the offload has 
been
-     *         successful
+     * @return a future, which when completed, denotes that the offload has 
been successful.
      */
     CompletableFuture<Void> offload(ReadHandle ledger,
                                     UUID uid,
@@ -69,9 +88,11 @@ public interface LedgerOffloader {
      *
      * @param ledgerId the ID of the ledger to load from longterm storage
      * @param uid unique ID for previous successful offload attempt
+     * @param offloadDriverMetadata offload driver metadata
      * @return a future, which when completed, returns a ReadHandle
      */
-    CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid);
+    CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                Map<String, String> 
offloadDriverMetadata);
 
     /**
      * Delete a ledger from long term storage.
@@ -81,9 +102,11 @@ public interface LedgerOffloader {
      *
      * @param ledgerId the ID of the ledger to delete from longterm storage
      * @param uid unique ID for previous offload attempt
+     * @param offloadDriverMetadata offload driver metadata
      * @return a future, which when completed, signifies that the ledger has
      *         been deleted
      */
-    CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid);
+    CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                            Map<String, String> 
offloadDriverMetadata);
 }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 388cdef..6e4a609 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
@@ -1390,7 +1391,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 if (info != null && info.hasOffloadContext() && 
info.getOffloadContext().getComplete()) {
                     UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
                                         info.getOffloadContext().getUidLsb());
-                    openFuture = 
config.getLedgerOffloader().readOffloaded(ledgerId, uid);
+                    // TODO: improve this to load ledger offloader by driver 
name recorded in metadata
+                    openFuture = config.getLedgerOffloader()
+                        .readOffloaded(ledgerId, uid, 
OffloadUtils.getOffloadDriverMetadata(info));
                 } else {
                     openFuture = bookKeeper.newOpenLedgerOp()
                         .withRecovery(!isReadOnly())
@@ -1771,7 +1774,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }
             for (LedgerInfo ls : offloadedLedgersToDelete) {
                 LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                newInfoBuilder.getOffloadContextBuilder()
+                    .setBookkeeperDeleted(true);
+                String driverName = OffloadUtils.getOffloadDriverName(
+                    ls, config.getLedgerOffloader().getOffloadDriverName());
+                Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(
+                    ls, 
config.getLedgerOffloader().getOffloadDriverMetadata());
+                OffloadUtils.setOffloadDriverMetadata(
+                    newInfoBuilder,
+                    driverName, driverMetadata
+                );
                 ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
             }
 
@@ -1903,7 +1915,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
                                  info.getOffloadContext().getUidLsb());
-            cleanupOffloaded(ledgerId, uuid, "Trimming");
+            cleanupOffloaded(
+                ledgerId, uuid,
+                OffloadUtils.getOffloadDriverName(info, 
config.getLedgerOffloader().getOffloadDriverName()),
+                OffloadUtils.getOffloadDriverMetadata(info, 
config.getLedgerOffloader().getOffloadDriverMetadata()),
+                "Trimming");
         }
     }
 
@@ -2105,7 +2121,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             UUID uuid = UUID.randomUUID();
             Map<String, String> extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
 
-            prepareLedgerInfoForOffloaded(ledgerId, uuid)
+            String driverName = 
config.getLedgerOffloader().getOffloadDriverName();
+            Map<String, String> driverMetadata = 
config.getLedgerOffloader().getOffloadDriverMetadata();
+
+            prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, 
driverMetadata)
                 .thenCompose((ignore) -> getLedgerHandle(ledgerId))
                 .thenCompose(readHandle -> 
config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
                 .thenCompose((ignore) -> {
@@ -2116,7 +2135,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                            scheduledExecutor, name)
                             .whenComplete((ignore2, exception) -> {
                                     if (exception != null) {
-                                        cleanupOffloaded(ledgerId, uuid, 
"Metastore failure");
+                                        cleanupOffloaded(
+                                            ledgerId, uuid,
+                                            driverName, driverMetadata,
+                                            "Metastore failure");
                                     }
                                 });
                     })
@@ -2216,7 +2238,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long 
ledgerId, UUID uuid) {
+    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long 
ledgerId,
+                                                                  UUID uuid,
+                                                                  String 
offloadDriverName,
+                                                                  Map<String, 
String> offloadDriverMetadata) {
         log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", 
name, ledgerId, uuid);
         return transformLedgerInfo(ledgerId,
                                    (oldInfo) -> {
@@ -2225,12 +2250,24 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                                                                    
oldInfo.getOffloadContext().getUidLsb());
                                            log.info("[{}] Found previous 
offload attempt for ledger {}, uuid {}"
                                                     + ", cleaning up", name, 
ledgerId, uuid);
-                                           cleanupOffloaded(ledgerId, oldUuid, 
"Previous failed offload");
+                                           cleanupOffloaded(
+                                               ledgerId,
+                                               oldUuid,
+                                               
OffloadUtils.getOffloadDriverName(oldInfo,
+                                                   
config.getLedgerOffloader().getOffloadDriverName()),
+                                               
OffloadUtils.getOffloadDriverMetadata(oldInfo,
+                                                   
config.getLedgerOffloader().getOffloadDriverMetadata()),
+                                               "Previous failed offload");
                                        }
                                        LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                        builder.getOffloadContextBuilder()
                                            
.setUidMsb(uuid.getMostSignificantBits())
                                            
.setUidLsb(uuid.getLeastSignificantBits());
+                                       OffloadUtils.setOffloadDriverMetadata(
+                                           builder,
+                                           offloadDriverName,
+                                           offloadDriverMetadata
+                                       );
                                        return builder.build();
                                    })
             .whenComplete((result, exception) -> {
@@ -2254,6 +2291,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                            builder.getOffloadContextBuilder()
                                                .setTimestamp(clock.millis())
                                                .setComplete(true);
+
+                                           String driverName = 
OffloadUtils.getOffloadDriverName(
+                                               oldInfo, 
config.getLedgerOffloader().getOffloadDriverName());
+                                           Map<String, String> driverMetadata 
= OffloadUtils.getOffloadDriverMetadata(
+                                               oldInfo, 
config.getLedgerOffloader().getOffloadDriverMetadata());
+                                           
OffloadUtils.setOffloadDriverMetadata(
+                                               builder,
+                                               driverName,
+                                               driverMetadata
+                                           );
                                            return builder.build();
                                        } else {
                                            throw new OffloadConflict(
@@ -2272,10 +2319,14 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 });
     }
 
-    private void cleanupOffloaded(long ledgerId, UUID uuid, String 
cleanupReason) {
+    private void cleanupOffloaded(long ledgerId,
+                                  UUID uuid,
+                                  String offloadDriverName, /* TODO: use 
driver name to identify offloader */
+                                  Map<String, String> offloadDriverMetadata,
+                                  String cleanupReason) {
         Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), 
TimeUnit.SECONDS.toHours(1)).limit(10),
                     Retries.NonFatalPredicate,
-                    () -> 
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
+                    () -> 
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, 
offloadDriverMetadata),
                     scheduledExecutor, name)
             .whenComplete((ignored, exception) -> {
                     if (exception != null) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index cd76a1b..3401f1b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -32,6 +32,11 @@ public class NullLedgerOffloader implements LedgerOffloader {
     public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();
 
     @Override
+    public String getOffloadDriverName() {
+        return "NullLedgerOffloader";
+    }
+
+    @Override
     public CompletableFuture<Void> offload(ReadHandle ledger,
                                            UUID uid,
                                            Map<String, String> extraMetadata) {
@@ -41,14 +46,16 @@ public class NullLedgerOffloader implements LedgerOffloader 
{
     }
 
     @Override
-    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uid) {
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         promise.completeExceptionally(new UnsupportedOperationException());
         return promise;
     }
 
     @Override
-    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                                   Map<String, String> 
offloadDriverMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         promise.completeExceptionally(new UnsupportedOperationException());
         return promise;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
new file mode 100644
index 0000000..44ebc80
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
+
+public final class OffloadUtils {
+
+    private OffloadUtils() {}
+
+    public static Map<String, String> getOffloadDriverMetadata(LedgerInfo 
ledgerInfo) {
+        Map<String, String> metadata = Maps.newHashMap();
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.getPropertiesCount() > 0) {
+                    driverMetadata.getPropertiesList().forEach(kv -> 
metadata.put(kv.getKey(), kv.getValue()));
+                }
+            }
+        }
+        return metadata;
+    }
+
+    public static Map<String, String> getOffloadDriverMetadata(LedgerInfo 
ledgerInfo,
+                                                               Map<String, 
String> defaultOffloadDriverMetadata) {
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.getPropertiesCount() > 0) {
+                    Map<String, String> metadata = Maps.newHashMap();
+                    driverMetadata.getPropertiesList().forEach(kv -> 
metadata.put(kv.getKey(), kv.getValue()));
+                    return metadata;
+                }
+            }
+        }
+        return defaultOffloadDriverMetadata;
+    }
+
+    public static String getOffloadDriverName(LedgerInfo ledgerInfo, String 
defaultDriverName) {
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.hasName()) {
+                    return driverMetadata.getName();
+                }
+            }
+        }
+        return defaultDriverName;
+    }
+
+    public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder,
+                                                String driverName,
+                                                Map<String, String> 
offloadDriverMetadata) {
+        infoBuilder.getOffloadContextBuilder()
+            .getDriverMetadataBuilder()
+            .setName(driverName);
+        offloadDriverMetadata.forEach((k, v) -> {
+            infoBuilder.getOffloadContextBuilder()
+                .getDriverMetadataBuilder()
+                .addProperties(KeyValue.newBuilder()
+                    .setKey(k)
+                    .setValue(v)
+                    .build());
+        });
+    }
+
+}
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto 
b/managed-ledger/src/main/proto/MLDataFormats.proto
index 0d5ad3a..4dbd231 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -21,12 +21,23 @@ syntax = "proto2";
 option java_package = "org.apache.bookkeeper.mledger.proto";
 option optimize_for = SPEED;
 
+message KeyValue {
+       required string key = 1;
+       required string value = 2;
+}
+
+message OffloadDriverMetadata {
+    required string name = 1;
+    repeated KeyValue properties = 2;
+}
+
 message OffloadContext {
     optional int64 uidMsb = 1;
     optional int64 uidLsb = 2;
     optional bool complete = 3;
     optional bool bookkeeperDeleted = 4;
     optional int64 timestamp = 5;
+    optional OffloadDriverMetadata driverMetadata = 6;
 }
 
 message ManagedLedgerInfo {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 8bbb44a..4bf518f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
@@ -30,6 +31,7 @@ import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -51,7 +53,6 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 
@@ -98,25 +99,33 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(1)).readOffloaded(anyLong(), anyObject());
-        verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID));
+        verify(offloader, times(1))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
+        verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), 
anyMap());
 
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
-        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID));
+        verify(offloader, times(2))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
+        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), 
anyMap());
 
         for (Entry e : cursor.readEntries(5)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
+        verify(offloader, times(2))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
         ConcurrentHashMap<UUID, ReadHandle> offloads = new 
ConcurrentHashMap<UUID, ReadHandle>();
 
         @Override
+        public String getOffloadDriverName() {
+            return "mock";
+        }
+
+        @Override
         public CompletableFuture<Void> offload(ReadHandle ledger,
                                                UUID uuid,
                                                Map<String, String> 
extraMetadata) {
@@ -131,12 +140,14 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         }
 
         @Override
-        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
+                                                           Map<String, String> 
offloadDriverMetadata) {
             return CompletableFuture.completedFuture(offloads.get(uuid));
         }
 
         @Override
-        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
             offloads.remove(uuid);
             return CompletableFuture.completedFuture(null);
         };
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 6d21ee2..351d86b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -493,9 +493,10 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
                 }
 
                 @Override
-                public CompletableFuture<Void> deleteOffloaded(long ledgerId, 
UUID uuid) {
+                public CompletableFuture<Void> deleteOffloaded(long ledgerId, 
UUID uuid,
+                                                               Map<String, 
String> offloadDriverMetadata) {
                     deleted.add(Pair.of(ledgerId, uuid));
-                    return super.deleteOffloaded(ledgerId, uuid);
+                    return super.deleteOffloaded(ledgerId, uuid, 
offloadDriverMetadata);
                 }
             };
         ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -929,6 +930,11 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         }
 
         @Override
+        public String getOffloadDriverName() {
+            return "mock";
+        }
+
+        @Override
         public CompletableFuture<Void> offload(ReadHandle ledger,
                                                UUID uuid,
                                                Map<String, String> 
extraMetadata) {
@@ -942,14 +948,16 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         }
 
         @Override
-        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
+                                                           Map<String, String> 
offloadDriverMetadata) {
             CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
             promise.completeExceptionally(new UnsupportedOperationException());
             return promise;
         }
 
         @Override
-        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
             CompletableFuture<Void> promise = new CompletableFuture<>();
             if (offloads.remove(ledgerId, uuid)) {
                 deletes.put(ledgerId, uuid);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index dc925a2..63b4d84 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Sets;
 
@@ -78,6 +79,8 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
 
     private void testOffload(String topicName, String mlName) throws Exception 
{
         LedgerOffloader offloader = mock(LedgerOffloader.class);
+        when(offloader.getOffloadDriverName()).thenReturn("mock");
+
         doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index eb4636f..fcd3300 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -42,6 +42,7 @@
       <groupId>org.apache.pulsar</groupId>
       <artifactId>jclouds-shaded</artifactId>
       <version>${project.version}</version>
+        <!--
       <exclusions>
         <exclusion>
           <groupId>com.google.code.gson</groupId>
@@ -68,6 +69,7 @@
           <artifactId>*</artifactId>
         </exclusion>
       </exclusions>
+      -->
     </dependency>
     <dependency>
       <groupId>com.amazonaws</groupId>
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index e09a66b..f96afaf 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -36,6 +36,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import lombok.Data;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -43,6 +46,7 @@ import 
org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.commons.lang3.tuple.Pair;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStore;
@@ -66,6 +70,10 @@ import org.slf4j.LoggerFactory;
 public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
 
+    private static final String METADATA_FIELD_BUCKET = "bucket";
+    private static final String METADATA_FIELD_REGION = "region";
+    private static final String METADATA_FIELD_ENDPOINT = "endpoint";
+
     public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
     // use these keys for both s3 and gcs.
@@ -91,6 +99,42 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         blobBuilder.userMetadata(metadataBuilder.build());
     }
 
+    @Data(staticConstructor = "of")
+    private static class BlobStoreLocation {
+        private final String region;
+        private final String endpoint;
+    }
+
+    private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String 
driver,
+                                                                      String 
region,
+                                                                      String 
endpoint,
+                                                                      
Credentials credentials,
+                                                                      int 
maxBlockSize) {
+        Properties overrides = new Properties();
+        // This property controls the number of parts being uploaded in 
parallel.
+        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
+        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
+
+        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+        contextBuilder.credentials(credentials.identity, 
credentials.credential);
+
+        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
+            contextBuilder.endpoint(endpoint);
+            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+        }
+        contextBuilder.overrides(overrides);
+        BlobStoreContext context = 
contextBuilder.buildView(BlobStoreContext.class);
+        BlobStore blobStore = context.getBlobStore();
+
+        log.info("Connect to blobstore : driver: {}, region: {}, endpoint: {}",
+            driver, region, endpoint);
+        return Pair.of(
+            BlobStoreLocation.of(region, endpoint),
+            blobStore);
+    }
+
     private final VersionCheck VERSION_CHECK = (key, blob) -> {
         // NOTE all metadata in jclouds comes out as lowercase, in an effort 
to normalize the providers
         String version = 
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
@@ -102,16 +146,28 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
     private final OrderedScheduler scheduler;
 
-    // container in jclouds
-    private final String bucket;
+    // container in jclouds to write offloaded ledgers
+    private final String writeBucket;
+    // the region to write offloaded ledgers
+    private final String writeRegion;
+    // the endpoint
+    private final String writeEndpoint;
+    // credentials
+    private final Credentials credentials;
+
     // max block size for each data block.
     private int maxBlockSize;
     private final int readBufferSize;
 
-    private BlobStoreContext context;
-    private BlobStore blobStore;
-    Location location = null;
+    private final BlobStore writeBlobStore;
+    private final Location writeLocation;
+
+    private final ConcurrentMap<BlobStoreLocation, BlobStore> readBlobStores = 
new ConcurrentHashMap<>();
+
+    // metadata to be stored as part of the offloaded ledger metadata
     private final Map<String, String> userMetadata;
+    // offload driver metadata to be stored as part of the original ledger 
metadata
+    private final String offloadDriverName;
 
     @VisibleForTesting
     static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfigurationData conf,
@@ -124,6 +180,9 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                                          OrderedScheduler 
scheduler)
             throws IOException {
         String driver = conf.getManagedLedgerOffloadDriver();
+        if ("s3".equals(driver.toLowerCase())) {
+            driver = "aws-s3";
+        }
         if (!driverSupported(driver)) {
             throw new IOException(
                 "Not support this kind of driver as offload backend: " + 
driver);
@@ -217,35 +276,34 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                     int maxBlockSize, int readBufferSize,
                                     String endpoint, String region, 
Credentials credentials,
                                     Map<String, String> userMetadata) {
+        this.offloadDriverName = driver;
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
-        this.bucket = container;
+        this.writeBucket = container;
+        this.writeRegion = region;
+        this.writeEndpoint = endpoint;
         this.maxBlockSize = maxBlockSize;
         this.userMetadata = userMetadata;
+        this.credentials = credentials;
 
-        Properties overrides = new Properties();
-        // This property controls the number of parts being uploaded in 
parallel.
-        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
-        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
-        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
-        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
-
-        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
-        contextBuilder.credentials(credentials.identity, 
credentials.credential);
-
-        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
-            contextBuilder.endpoint(endpoint);
-            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
-        }
         if (!Strings.isNullOrEmpty(region)) {
-            this.location = new 
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
+            this.writeLocation = new LocationBuilder()
+                .scope(LocationScope.REGION)
+                .id(region)
+                .description(region)
+                .build();
+        } else {
+            this.writeLocation = null;
         }
 
-        log.info("Constructor driver: {}, host: {}, container: {}, region: {} 
",  driver, endpoint, bucket, region);
+        log.info("Constructor offload driver: {}, host: {}, container: {}, 
region: {} ",
+            driver, endpoint, container, region);
 
-        contextBuilder.overrides(overrides);
-        this.context = contextBuilder.buildView(BlobStoreContext.class);
-        this.blobStore = context.getBlobStore();
+        Pair<BlobStoreLocation, BlobStore> blobStore = createBlobStore(
+            driver, region, endpoint, credentials, maxBlockSize
+        );
+        this.writeBlobStore = blobStore.getRight();
+        this.readBlobStores.put(blobStore.getLeft(), blobStore.getRight());
     }
 
     // build context for jclouds BlobStoreContext, mostly used in test
@@ -258,12 +316,22 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
                                     int maxBlockSize, int readBufferSize,
                                     Map<String, String> userMetadata) {
+        this.offloadDriverName = "aws-s3";
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
-        this.bucket = container;
+        this.writeBucket = container;
+        this.writeRegion = null;
+        this.writeEndpoint = null;
         this.maxBlockSize = maxBlockSize;
-        this.blobStore = blobStore;
+        this.writeBlobStore = blobStore;
+        this.writeLocation = null;
         this.userMetadata = userMetadata;
+        this.credentials = null;
+
+        readBlobStores.put(
+            BlobStoreLocation.of(writeRegion, writeEndpoint),
+            blobStore
+        );
     }
 
     static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -274,12 +342,26 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
     }
 
-    public boolean createBucket() {
-        return blobStore.createContainerInLocation(location, bucket);
+    public boolean createBucket(String bucket) {
+        return writeBlobStore.createContainerInLocation(writeLocation, bucket);
+    }
+
+    public void deleteBucket(String bucket) {
+        writeBlobStore.deleteContainer(bucket);
+    }
+
+    @Override
+    public String getOffloadDriverName() {
+        return offloadDriverName;
     }
 
-    public void deleteBucket() {
-        blobStore.deleteContainer(bucket);
+    @Override
+    public Map<String, String> getOffloadDriverMetadata() {
+        return ImmutableMap.of(
+            METADATA_FIELD_BUCKET, writeBucket,
+            METADATA_FIELD_REGION, writeRegion,
+            METADATA_FIELD_ENDPOINT, writeEndpoint
+        );
     }
 
     // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
@@ -305,10 +387,10 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
             // init multi part upload for data block.
             try {
-                BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+                BlobBuilder blobBuilder = 
writeBlobStore.blobBuilder(dataBlockKey);
                 addVersionInfo(blobBuilder, userMetadata);
                 Blob blob = blobBuilder.build();
-                mpu = blobStore.initiateMultipartUpload(bucket, 
blob.getMetadata(), new PutOptions());
+                mpu = writeBlobStore.initiateMultipartUpload(writeBucket, 
blob.getMetadata(), new PutOptions());
             } catch (Throwable t) {
                 promise.completeExceptionally(t);
                 return;
@@ -330,9 +412,9 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                         Payload partPayload = 
Payloads.newInputStreamPayload(blockStream);
                         
partPayload.getContentMetadata().setContentLength((long)blockSize);
                         
partPayload.getContentMetadata().setContentType("application/octet-stream");
-                        parts.add(blobStore.uploadMultipartPart(mpu, partId, 
partPayload));
+                        parts.add(writeBlobStore.uploadMultipartPart(mpu, 
partId, partPayload));
                         log.debug("UploadMultipartPart. container: {}, 
blobName: {}, partId: {}, mpu: {}",
-                            bucket, dataBlockKey, partId, mpu.id());
+                            writeBucket, dataBlockKey, partId, mpu.id());
 
                         indexBuilder.addBlock(startEntry, partId, blockSize);
 
@@ -349,16 +431,16 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                     dataObjectLength += blockSize;
                 }
 
-                blobStore.completeMultipartUpload(mpu, parts);
+                writeBlobStore.completeMultipartUpload(mpu, parts);
                 mpu = null;
             } catch (Throwable t) {
                 try {
                     if (mpu != null) {
-                        blobStore.abortMultipartUpload(mpu);
+                        writeBlobStore.abortMultipartUpload(mpu);
                     }
                 } catch (Throwable throwable) {
                     log.error("Failed abortMultipartUpload in bucket - {} with 
key - {}, uploadId - {}.",
-                        bucket, dataBlockKey, mpu.id(), throwable);
+                        writeBucket, dataBlockKey, mpu.id(), throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -368,7 +450,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
             try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
                  OffloadIndexBlock.IndexInputStream indexStream = 
index.toStream()) {
                 // write the index block
-                BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+                BlobBuilder blobBuilder = 
writeBlobStore.blobBuilder(indexBlockKey);
                 addVersionInfo(blobBuilder, userMetadata);
                 Payload indexPayload = 
Payloads.newInputStreamPayload(indexStream);
                 
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
@@ -379,14 +461,14 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                     .contentLength((long)indexStream.getStreamSize())
                     .build();
 
-                blobStore.putBlob(bucket, blob);
+                writeBlobStore.putBlob(writeBucket, blob);
                 promise.complete(null);
             } catch (Throwable t) {
                 try {
-                    blobStore.removeBlob(bucket, dataBlockKey);
+                    writeBlobStore.removeBlob(writeBucket, dataBlockKey);
                 } catch (Throwable throwable) {
                     log.error("Failed deleteObject in bucket - {} with key - 
{}.",
-                        bucket, dataBlockKey, throwable);
+                        writeBucket, dataBlockKey, throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -395,16 +477,57 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         return promise;
     }
 
+    String getReadRegion(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_REGION, 
writeRegion);
+    }
+
+    String getReadBucket(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_BUCKET, 
writeBucket);
+    }
+
+    String getReadEndpoint(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_ENDPOINT, 
writeEndpoint);
+    }
+
+    BlobStore getReadBlobStore(Map<String, String> offloadDriverMetadata) {
+        BlobStoreLocation location = BlobStoreLocation.of(
+            getReadRegion(offloadDriverMetadata),
+            getReadEndpoint(offloadDriverMetadata)
+        );
+        BlobStore blobStore = readBlobStores.get(location);
+        if (null == blobStore) {
+            blobStore = createBlobStore(
+                offloadDriverName,
+                location.getRegion(),
+                location.getEndpoint(),
+                credentials,
+                maxBlockSize
+            ).getRight();
+            BlobStore existingBlobStore = readBlobStores.putIfAbsent(location, 
blobStore);
+            if (null == existingBlobStore) {
+                return blobStore;
+            } else {
+                return existingBlobStore;
+            }
+        } else {
+            return blobStore;
+        }
+    }
+
     @Override
-    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uid) {
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
+        String readBucket = getReadBucket(offloadDriverMetadata);
+        BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = dataBlockOffloadKey(ledgerId, uid);
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
                     
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                 blobStore,
-                                                                 bucket, key, 
indexKey,
+                                                                 readBlobstore,
+                                                                 readBucket, 
key, indexKey,
                                                                  VERSION_CHECK,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
@@ -418,11 +541,15 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
 
     @Override
-    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                                   Map<String, String> 
offloadDriverMetadata) {
+        String readBucket = getReadBucket(offloadDriverMetadata);
+        BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-                blobStore.removeBlobs(bucket,
+                readBlobstore.removeBlobs(readBucket,
                     ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index fca1ef2..eb88d37 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -30,6 +30,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -252,7 +253,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         try (LedgerEntries toWriteEntries = toWrite.read(0, 
toWrite.getLastAddConfirmed());
@@ -406,7 +407,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         for (long[] access : randomAccesses) {
@@ -438,7 +439,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         try {
@@ -467,7 +468,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
 
         // verify object deleted after delete
-        offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        offloader.deleteOffloaded(readHandle.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertFalse(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
         Assert.assertFalse(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
     }
@@ -492,7 +493,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
             Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
             Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
 
-            offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+            offloader.deleteOffloaded(readHandle.getId(), uuid, 
Collections.emptyMap()).get();
         } catch (Exception e) {
             // expected
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
@@ -542,7 +543,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
 String.valueOf(-12345));
         blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
-        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid, Collections.emptyMap()).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
@@ -554,7 +555,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
 String.valueOf(12345));
         blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
-        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid, Collections.emptyMap()).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
@@ -581,7 +582,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
-            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
             Assert.fail("Shouldn't have been able to open");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
@@ -592,7 +593,7 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
         blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
-            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
             Assert.fail("Shouldn't have been able to open");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);

Reply via email to