This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5980169 [tiered storage] store driver name and driver specific metadata in original ledger metadata (#2398) 5980169 is described below commit 59801695a9808c4c6574b421f90b0d91a9d712ad Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Aug 20 23:21:53 2018 -0700 [tiered storage] store driver name and driver specific metadata in original ledger metadata (#2398) ### Motivation 1) Currently the location of an offloaded ledger isn't stored in the original ledger metadata. That means if configuration is changed or modified by mistake. We might potentially cause data loss. 2) The location of an offloaded ledger is needed by Pulsar SQL. so it is very inconvinient to have the location information stored in a configuration and the approach is also problematic. ### Changes Store `driverName` and driver-specific metadata (e.g. bucket name, region name, endpoint) in the original ledger metadata. Change ManagedLedgerImpl to use the driver-specific metadata to read the offloaded ledger. If the driver-specific metadata is missed, it will fall back to use the configuration. ### Tests This change doesn't change the behavior. Existing unit tests and integration tests already covered the logic. ### NOTES Currently the driver name in metadata is not used. We need to use driver name to load different offloader driver after #2393 is implemented --- .../apache/bookkeeper/mledger/LedgerOffloader.java | 33 ++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 69 ++++++- .../mledger/impl/NullLedgerOffloader.java | 11 +- .../bookkeeper/mledger/offload/OffloadUtils.java | 91 +++++++++ managed-ledger/src/main/proto/MLDataFormats.proto | 11 + .../mledger/impl/OffloadPrefixReadTest.java | 27 ++- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 16 +- .../pulsar/broker/admin/AdminApiOffloadTest.java | 3 + tiered-storage/jcloud/pom.xml | 2 + .../impl/BlobStoreManagedLedgerOffloader.java | 221 ++++++++++++++++----- .../impl/BlobStoreManagedLedgerOffloaderTest.java | 19 +- 11 files changed, 419 insertions(+), 84 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java index 6885500..8fc35cc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -31,6 +32,25 @@ import org.apache.bookkeeper.client.api.ReadHandle; */ @Beta public interface LedgerOffloader { + + /** + * Get offload driver name. + * + * @return offload driver name. + */ + String getOffloadDriverName(); + + /** + * Get offload driver metadata. + * + * <p>The driver metadata will be recorded as part of the metadata of the original ledger. + * + * @return offload driver metadata. + */ + default Map<String, String> getOffloadDriverMetadata() { + return Collections.emptyMap(); + } + /** * Offload the passed in ledger to longterm storage. * Metadata passed in is for inspection purposes only and should be stored @@ -51,10 +71,9 @@ public interface LedgerOffloader { * * @param ledger the ledger to offload * @param uid unique id to identity this offload attempt - * @param extraMetadata metadata to be stored with the ledger for informational + * @param extraMetadata metadata to be stored with the offloaded ledger for informational * purposes - * @return a future, which when completed, denotes that the offload has been - * successful + * @return a future, which when completed, denotes that the offload has been successful. */ CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, @@ -69,9 +88,11 @@ public interface LedgerOffloader { * * @param ledgerId the ID of the ledger to load from longterm storage * @param uid unique ID for previous successful offload attempt + * @param offloadDriverMetadata offload driver metadata * @return a future, which when completed, returns a ReadHandle */ - CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid); + CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata); /** * Delete a ledger from long term storage. @@ -81,9 +102,11 @@ public interface LedgerOffloader { * * @param ledgerId the ID of the ledger to delete from longterm storage * @param uid unique ID for previous offload attempt + * @param offloadDriverMetadata offload driver metadata * @return a future, which when completed, signifies that the ledger has * been deleted */ - CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid); + CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 388cdef..6e4a609 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -96,6 +96,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; @@ -1390,7 +1391,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) { UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid); + // TODO: improve this to load ledger offloader by driver name recorded in metadata + openFuture = config.getLedgerOffloader() + .readOffloaded(ledgerId, uid, OffloadUtils.getOffloadDriverMetadata(info)); } else { openFuture = bookKeeper.newOpenLedgerOp() .withRecovery(!isReadOnly()) @@ -1771,7 +1774,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); - newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + newInfoBuilder.getOffloadContextBuilder() + .setBookkeeperDeleted(true); + String driverName = OffloadUtils.getOffloadDriverName( + ls, config.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata( + ls, config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata( + newInfoBuilder, + driverName, driverMetadata + ); ledgers.put(ls.getLedgerId(), newInfoBuilder.build()); } @@ -1903,7 +1915,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - cleanupOffloaded(ledgerId, uuid, "Trimming"); + cleanupOffloaded( + ledgerId, uuid, + OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), + "Trimming"); } } @@ -2105,7 +2121,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { UUID uuid = UUID.randomUUID(); Map<String, String> extraMetadata = ImmutableMap.of("ManagedLedgerName", name); - prepareLedgerInfoForOffloaded(ledgerId, uuid) + String driverName = config.getLedgerOffloader().getOffloadDriverName(); + Map<String, String> driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); + + prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { @@ -2116,7 +2135,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { scheduledExecutor, name) .whenComplete((ignore2, exception) -> { if (exception != null) { - cleanupOffloaded(ledgerId, uuid, "Metastore failure"); + cleanupOffloaded( + ledgerId, uuid, + driverName, driverMetadata, + "Metastore failure"); } }); }) @@ -2216,7 +2238,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid) { + private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, + UUID uuid, + String offloadDriverName, + Map<String, String> offloadDriverMetadata) { log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, (oldInfo) -> { @@ -2225,12 +2250,24 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { oldInfo.getOffloadContext().getUidLsb()); log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" + ", cleaning up", name, ledgerId, uuid); - cleanupOffloaded(ledgerId, oldUuid, "Previous failed offload"); + cleanupOffloaded( + ledgerId, + oldUuid, + OffloadUtils.getOffloadDriverName(oldInfo, + config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.getOffloadDriverMetadata(oldInfo, + config.getLedgerOffloader().getOffloadDriverMetadata()), + "Previous failed offload"); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() .setUidMsb(uuid.getMostSignificantBits()) .setUidLsb(uuid.getLeastSignificantBits()); + OffloadUtils.setOffloadDriverMetadata( + builder, + offloadDriverName, + offloadDriverMetadata + ); return builder.build(); }) .whenComplete((result, exception) -> { @@ -2254,6 +2291,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { builder.getOffloadContextBuilder() .setTimestamp(clock.millis()) .setComplete(true); + + String driverName = OffloadUtils.getOffloadDriverName( + oldInfo, config.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata( + oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata( + builder, + driverName, + driverMetadata + ); return builder.build(); } else { throw new OffloadConflict( @@ -2272,10 +2319,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }); } - private void cleanupOffloaded(long ledgerId, UUID uuid, String cleanupReason) { + private void cleanupOffloaded(long ledgerId, + UUID uuid, + String offloadDriverName, /* TODO: use driver name to identify offloader */ + Map<String, String> offloadDriverMetadata, + String cleanupReason) { Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid), + () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata), scheduledExecutor, name) .whenComplete((ignored, exception) -> { if (exception != null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java index cd76a1b..3401f1b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java @@ -32,6 +32,11 @@ public class NullLedgerOffloader implements LedgerOffloader { public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader(); @Override + public String getOffloadDriverName() { + return "NullLedgerOffloader"; + } + + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) { @@ -41,14 +46,16 @@ public class NullLedgerOffloader implements LedgerOffloader { } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java new file mode 100644 index 0000000..44ebc80 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata; + +public final class OffloadUtils { + + private OffloadUtils() {} + + public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo) { + Map<String, String> metadata = Maps.newHashMap(); + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.getPropertiesCount() > 0) { + driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue())); + } + } + } + return metadata; + } + + public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo, + Map<String, String> defaultOffloadDriverMetadata) { + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.getPropertiesCount() > 0) { + Map<String, String> metadata = Maps.newHashMap(); + driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue())); + return metadata; + } + } + } + return defaultOffloadDriverMetadata; + } + + public static String getOffloadDriverName(LedgerInfo ledgerInfo, String defaultDriverName) { + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.hasName()) { + return driverMetadata.getName(); + } + } + } + return defaultDriverName; + } + + public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder, + String driverName, + Map<String, String> offloadDriverMetadata) { + infoBuilder.getOffloadContextBuilder() + .getDriverMetadataBuilder() + .setName(driverName); + offloadDriverMetadata.forEach((k, v) -> { + infoBuilder.getOffloadContextBuilder() + .getDriverMetadataBuilder() + .addProperties(KeyValue.newBuilder() + .setKey(k) + .setValue(v) + .build()); + }); + } + +} diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 0d5ad3a..4dbd231 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -21,12 +21,23 @@ syntax = "proto2"; option java_package = "org.apache.bookkeeper.mledger.proto"; option optimize_for = SPEED; +message KeyValue { + required string key = 1; + required string value = 2; +} + +message OffloadDriverMetadata { + required string name = 1; + repeated KeyValue properties = 2; +} + message OffloadContext { optional int64 uidMsb = 1; optional int64 uidLsb = 2; optional bool complete = 3; optional bool bookkeeperDeleted = 4; optional int64 timestamp = 5; + optional OffloadDriverMetadata driverMetadata = 6; } message ManagedLedgerInfo { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 8bbb44a..4bf518f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; @@ -30,6 +31,7 @@ import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -51,7 +53,6 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -98,25 +99,33 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { for (Entry e : cursor.readEntries(10)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(1)).readOffloaded(anyLong(), anyObject()); - verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID)); + verify(offloader, times(1)) + .readOffloaded(anyLong(), anyObject(), anyMap()); + verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(10)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); - verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID)); + verify(offloader, times(2)) + .readOffloaded(anyLong(), anyObject(), anyMap()); + verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(5)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); + verify(offloader, times(2)) + .readOffloaded(anyLong(), anyObject(), anyMap()); } static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>(); @Override + public String getOffloadDriverName() { + return "mock"; + } + + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) { @@ -131,12 +140,14 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { return CompletableFuture.completedFuture(offloads.get(uuid)); } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { offloads.remove(uuid); return CompletableFuture.completedFuture(null); }; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 6d21ee2..351d86b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -493,9 +493,10 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { deleted.add(Pair.of(ledgerId, uuid)); - return super.deleteOffloaded(ledgerId, uuid); + return super.deleteOffloaded(ledgerId, uuid, offloadDriverMetadata); } }; ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -929,6 +930,11 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { } @Override + public String getOffloadDriverName() { + return "mock"; + } + + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) { @@ -942,14 +948,16 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); if (offloads.remove(ledgerId, uuid)) { deletes.put(ledgerId, uuid); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index dc925a2..63b4d84 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.Sets; @@ -78,6 +79,8 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { private void testOffload(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); + when(offloader.getOffloadDriverName()).thenReturn("mock"); + doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); CompletableFuture<Void> promise = new CompletableFuture<>(); diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml index eb4636f..fcd3300 100644 --- a/tiered-storage/jcloud/pom.xml +++ b/tiered-storage/jcloud/pom.xml @@ -42,6 +42,7 @@ <groupId>org.apache.pulsar</groupId> <artifactId>jclouds-shaded</artifactId> <version>${project.version}</version> + <!-- <exclusions> <exclusion> <groupId>com.google.code.gson</groupId> @@ -68,6 +69,7 @@ <artifactId>*</artifactId> </exclusion> </exclusions> + --> </dependency> <dependency> <groupId>com.amazonaws</groupId> diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index e09a66b..f96afaf 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -36,6 +36,9 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import lombok.Data; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -43,6 +46,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; +import org.apache.commons.lang3.tuple.Pair; import org.jclouds.Constants; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; @@ -66,6 +70,10 @@ import org.slf4j.LoggerFactory; public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class); + private static final String METADATA_FIELD_BUCKET = "bucket"; + private static final String METADATA_FIELD_REGION = "region"; + private static final String METADATA_FIELD_ENDPOINT = "endpoint"; + public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; // use these keys for both s3 and gcs. @@ -91,6 +99,42 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { blobBuilder.userMetadata(metadataBuilder.build()); } + @Data(staticConstructor = "of") + private static class BlobStoreLocation { + private final String region; + private final String endpoint; + } + + private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver, + String region, + String endpoint, + Credentials credentials, + int maxBlockSize) { + Properties overrides = new Properties(); + // This property controls the number of parts being uploaded in parallel. + overrides.setProperty("jclouds.mpu.parallel.degree", "1"); + overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); + overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); + overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); + + ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); + contextBuilder.credentials(credentials.identity, credentials.credential); + + if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { + contextBuilder.endpoint(endpoint); + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + } + contextBuilder.overrides(overrides); + BlobStoreContext context = contextBuilder.buildView(BlobStoreContext.class); + BlobStore blobStore = context.getBlobStore(); + + log.info("Connect to blobstore : driver: {}, region: {}, endpoint: {}", + driver, region, endpoint); + return Pair.of( + BlobStoreLocation.of(region, endpoint), + blobStore); + } + private final VersionCheck VERSION_CHECK = (key, blob) -> { // NOTE all metadata in jclouds comes out as lowercase, in an effort to normalize the providers String version = blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase()); @@ -102,16 +146,28 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private final OrderedScheduler scheduler; - // container in jclouds - private final String bucket; + // container in jclouds to write offloaded ledgers + private final String writeBucket; + // the region to write offloaded ledgers + private final String writeRegion; + // the endpoint + private final String writeEndpoint; + // credentials + private final Credentials credentials; + // max block size for each data block. private int maxBlockSize; private final int readBufferSize; - private BlobStoreContext context; - private BlobStore blobStore; - Location location = null; + private final BlobStore writeBlobStore; + private final Location writeLocation; + + private final ConcurrentMap<BlobStoreLocation, BlobStore> readBlobStores = new ConcurrentHashMap<>(); + + // metadata to be stored as part of the offloaded ledger metadata private final Map<String, String> userMetadata; + // offload driver metadata to be stored as part of the original ledger metadata + private final String offloadDriverName; @VisibleForTesting static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationData conf, @@ -124,6 +180,9 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { OrderedScheduler scheduler) throws IOException { String driver = conf.getManagedLedgerOffloadDriver(); + if ("s3".equals(driver.toLowerCase())) { + driver = "aws-s3"; + } if (!driverSupported(driver)) { throw new IOException( "Not support this kind of driver as offload backend: " + driver); @@ -217,35 +276,34 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials, Map<String, String> userMetadata) { + this.offloadDriverName = driver; this.scheduler = scheduler; this.readBufferSize = readBufferSize; - this.bucket = container; + this.writeBucket = container; + this.writeRegion = region; + this.writeEndpoint = endpoint; this.maxBlockSize = maxBlockSize; this.userMetadata = userMetadata; + this.credentials = credentials; - Properties overrides = new Properties(); - // This property controls the number of parts being uploaded in parallel. - overrides.setProperty("jclouds.mpu.parallel.degree", "1"); - overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); - overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); - overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); - - ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); - contextBuilder.credentials(credentials.identity, credentials.credential); - - if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { - contextBuilder.endpoint(endpoint); - overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); - } if (!Strings.isNullOrEmpty(region)) { - this.location = new LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build(); + this.writeLocation = new LocationBuilder() + .scope(LocationScope.REGION) + .id(region) + .description(region) + .build(); + } else { + this.writeLocation = null; } - log.info("Constructor driver: {}, host: {}, container: {}, region: {} ", driver, endpoint, bucket, region); + log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ", + driver, endpoint, container, region); - contextBuilder.overrides(overrides); - this.context = contextBuilder.buildView(BlobStoreContext.class); - this.blobStore = context.getBlobStore(); + Pair<BlobStoreLocation, BlobStore> blobStore = createBlobStore( + driver, region, endpoint, credentials, maxBlockSize + ); + this.writeBlobStore = blobStore.getRight(); + this.readBlobStores.put(blobStore.getLeft(), blobStore.getRight()); } // build context for jclouds BlobStoreContext, mostly used in test @@ -258,12 +316,22 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler, int maxBlockSize, int readBufferSize, Map<String, String> userMetadata) { + this.offloadDriverName = "aws-s3"; this.scheduler = scheduler; this.readBufferSize = readBufferSize; - this.bucket = container; + this.writeBucket = container; + this.writeRegion = null; + this.writeEndpoint = null; this.maxBlockSize = maxBlockSize; - this.blobStore = blobStore; + this.writeBlobStore = blobStore; + this.writeLocation = null; this.userMetadata = userMetadata; + this.credentials = null; + + readBlobStores.put( + BlobStoreLocation.of(writeRegion, writeEndpoint), + blobStore + ); } static String dataBlockOffloadKey(long ledgerId, UUID uuid) { @@ -274,12 +342,26 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId); } - public boolean createBucket() { - return blobStore.createContainerInLocation(location, bucket); + public boolean createBucket(String bucket) { + return writeBlobStore.createContainerInLocation(writeLocation, bucket); + } + + public void deleteBucket(String bucket) { + writeBlobStore.deleteContainer(bucket); + } + + @Override + public String getOffloadDriverName() { + return offloadDriverName; } - public void deleteBucket() { - blobStore.deleteContainer(bucket); + @Override + public Map<String, String> getOffloadDriverMetadata() { + return ImmutableMap.of( + METADATA_FIELD_BUCKET, writeBucket, + METADATA_FIELD_REGION, writeRegion, + METADATA_FIELD_ENDPOINT, writeEndpoint + ); } // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, @@ -305,10 +387,10 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { // init multi part upload for data block. try { - BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey); + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); addVersionInfo(blobBuilder, userMetadata); Blob blob = blobBuilder.build(); - mpu = blobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); + mpu = writeBlobStore.initiateMultipartUpload(writeBucket, blob.getMetadata(), new PutOptions()); } catch (Throwable t) { promise.completeExceptionally(t); return; @@ -330,9 +412,9 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { Payload partPayload = Payloads.newInputStreamPayload(blockStream); partPayload.getContentMetadata().setContentLength((long)blockSize); partPayload.getContentMetadata().setContentType("application/octet-stream"); - parts.add(blobStore.uploadMultipartPart(mpu, partId, partPayload)); + parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, partPayload)); log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", - bucket, dataBlockKey, partId, mpu.id()); + writeBucket, dataBlockKey, partId, mpu.id()); indexBuilder.addBlock(startEntry, partId, blockSize); @@ -349,16 +431,16 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { dataObjectLength += blockSize; } - blobStore.completeMultipartUpload(mpu, parts); + writeBlobStore.completeMultipartUpload(mpu, parts); mpu = null; } catch (Throwable t) { try { if (mpu != null) { - blobStore.abortMultipartUpload(mpu); + writeBlobStore.abortMultipartUpload(mpu); } } catch (Throwable throwable) { log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, mpu.id(), throwable); + writeBucket, dataBlockKey, mpu.id(), throwable); } promise.completeExceptionally(t); return; @@ -368,7 +450,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) { // write the index block - BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey); + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey); addVersionInfo(blobBuilder, userMetadata); Payload indexPayload = Payloads.newInputStreamPayload(indexStream); indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize()); @@ -379,14 +461,14 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { .contentLength((long)indexStream.getStreamSize()) .build(); - blobStore.putBlob(bucket, blob); + writeBlobStore.putBlob(writeBucket, blob); promise.complete(null); } catch (Throwable t) { try { - blobStore.removeBlob(bucket, dataBlockKey); + writeBlobStore.removeBlob(writeBucket, dataBlockKey); } catch (Throwable throwable) { log.error("Failed deleteObject in bucket - {} with key - {}.", - bucket, dataBlockKey, throwable); + writeBucket, dataBlockKey, throwable); } promise.completeExceptionally(t); return; @@ -395,16 +477,57 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { return promise; } + String getReadRegion(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_REGION, writeRegion); + } + + String getReadBucket(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_BUCKET, writeBucket); + } + + String getReadEndpoint(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_ENDPOINT, writeEndpoint); + } + + BlobStore getReadBlobStore(Map<String, String> offloadDriverMetadata) { + BlobStoreLocation location = BlobStoreLocation.of( + getReadRegion(offloadDriverMetadata), + getReadEndpoint(offloadDriverMetadata) + ); + BlobStore blobStore = readBlobStores.get(location); + if (null == blobStore) { + blobStore = createBlobStore( + offloadDriverName, + location.getRegion(), + location.getEndpoint(), + credentials, + maxBlockSize + ).getRight(); + BlobStore existingBlobStore = readBlobStores.putIfAbsent(location, blobStore); + if (null == existingBlobStore) { + return blobStore; + } else { + return existingBlobStore; + } + } else { + return blobStore; + } + } + @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { + String readBucket = getReadBucket(offloadDriverMetadata); + BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata); + CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); String key = dataBlockOffloadKey(ledgerId, uid); String indexKey = indexBlockOffloadKey(ledgerId, uid); scheduler.chooseThread(ledgerId).submit(() -> { try { promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), - blobStore, - bucket, key, indexKey, + readBlobstore, + readBucket, key, indexKey, VERSION_CHECK, ledgerId, readBufferSize)); } catch (Throwable t) { @@ -418,11 +541,15 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { + String readBucket = getReadBucket(offloadDriverMetadata); + BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata); + CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.chooseThread(ledgerId).submit(() -> { try { - blobStore.removeBlobs(bucket, + readBlobstore.removeBlobs(readBucket, ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index fca1ef2..eb88d37 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -252,7 +253,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); try (LedgerEntries toWriteEntries = toWrite.read(0, toWrite.getLastAddConfirmed()); @@ -406,7 +407,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); for (long[] access : randomAccesses) { @@ -438,7 +439,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); try { @@ -467,7 +468,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); // verify object deleted after delete - offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + offloader.deleteOffloaded(readHandle.getId(), uuid, Collections.emptyMap()).get(); Assert.assertFalse(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid))); Assert.assertFalse(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); } @@ -492,7 +493,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid))); Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); - offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + offloader.deleteOffloaded(readHandle.getId(), uuid, Collections.emptyMap()).get(); } catch (Exception e) { // expected Assert.assertTrue(e.getCause().getMessage().contains(failureString)); @@ -542,7 +543,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(), String.valueOf(-12345)); blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); - try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { @@ -554,7 +555,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(), String.valueOf(12345)); blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); - try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { @@ -581,7 +582,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { - offloader.readOffloaded(toWrite.getId(), uuid).get(); + offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.fail("Shouldn't have been able to open"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class); @@ -592,7 +593,7 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { - offloader.readOffloaded(toWrite.getId(), uuid).get(); + offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.fail("Shouldn't have been able to open"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class);