zymap commented on a change in pull request #9202:
URL: https://github.com/apache/pulsar/pull/9202#discussion_r568319131
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -386,6 +408,292 @@ public void operationFailed(MetaStoreException e) {
scheduleRollOverLedgerTask();
}
+ /**
+ * Should be called after `ledgers` were initialized.
+ */
+ void initializeStreamingOffloader() {
+ if (getOffloadMethod() == OffloadMethod.STREAMING_BASED) {
+ log.info("Streaming offload enabled for managed ledger: {}", name);
+ } else {
+ log.info("Streaming offload not enabled for managed ledger: {}",
name);
+ return;
+ }
+
+ if (!offloadMutex.tryLock()) {
+ log.info("try streaming offload,but already offloading");
+ return;
+ }
+
+ //get newest config and drop progress status of last offload
+ offloader = getConfig().getLedgerOffloader().fork();
+
+ this.offloadSegments = Queues.newConcurrentLinkedQueue();
+
+ initializeSegments();
+
+ if (offloadSegments.isEmpty()) {
+ log.error("Streaming offloading began but there is no segments to
offload, should not happen.");
+ throw new RuntimeException(
+ "Streaming offloading began but there is no segments to
offload, should not happen.");
+ }
+
+ startOffload();
+ }
+
+ private void initializeSegments() {
+ Long updatedLedgerId = null;
+ LedgerInfo updatedLedgerInfo = null;
+ for (Map.Entry<Long, LedgerInfo> idInfo : ledgers.entrySet()) {
+
+ final Long ledgerId = idInfo.getKey();
+ LedgerInfo ledgerInfo = idInfo.getValue();
+ String driverName = OffloadUtils.getOffloadDriverName(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverMetadata());
+
+ if (!ledgerInfo.hasOffloadContext()) {
+ final OffloadContext context = OffloadContext.newBuilder()
+ .setComplete(false)
+ .build();
+ ledgerInfo =
ledgerInfo.toBuilder().setOffloadContext(context).build();
+ }
+
+ if (!isStreamingOffloadCompleted(ledgerInfo)) {
+ List<OffloadSegment> newSegments = Lists.newArrayList();
+ // Continue from incomplete context
+ long beginEntry = 0;
+ for (OffloadSegment offloadSegment :
ledgerInfo.getOffloadContext().getOffloadSegmentList()) {
+ if (offloadSegment.getComplete()) {
+ if (!offloadSegment.hasEndEntryId()) {
+ log.error("segment of ledger {} offload completed
bug not have end entry id "
+ + "should not happen. {}", ledgerId,
ledgerInfo);
+ } else {
+ beginEntry = offloadSegment.getEndEntryId() + 1;
+ newSegments.add(offloadSegment);
+ }
+ }
+ }
+
+ UUID uuid = UUID.randomUUID();
+ final OffloadSegment.Builder segment =
OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setAssignedTimestamp(System.currentTimeMillis())
+ .setComplete(false);
+ OffloadUtils.setOffloadDriverMetadata(segment, driverName,
driverMetadata);
+ newSegments.add(segment.build());
+ final OffloadContext context =
ledgerInfo.getOffloadContext().toBuilder().clearOffloadSegment()
+ .addAllOffloadSegment(newSegments).build();
+ final LedgerInfo newLedgerInfo =
ledgerInfo.toBuilder().setOffloadContext(context).build();
+ updatedLedgerId = idInfo.getKey();
+ updatedLedgerInfo = newLedgerInfo;
+ offloadSegments.add(new OffloadSegmentInfoImpl(uuid, ledgerId,
beginEntry, driverName, driverMetadata));
+ break;
+ }
+ }
+ log.debug("updated ledgerId: {}", updatedLedgerId);
+ ledgers.put(updatedLedgerId, updatedLedgerInfo);
+ }
+
+ public static boolean isStreamingOffloadCompleted(LedgerInfo ledgerInfo) {
Review comment:
is this for verifying the previous offload status?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -386,6 +408,292 @@ public void operationFailed(MetaStoreException e) {
scheduleRollOverLedgerTask();
}
+ /**
+ * Should be called after `ledgers` were initialized.
+ */
+ void initializeStreamingOffloader() {
+ if (getOffloadMethod() == OffloadMethod.STREAMING_BASED) {
+ log.info("Streaming offload enabled for managed ledger: {}", name);
+ } else {
+ log.info("Streaming offload not enabled for managed ledger: {}",
name);
+ return;
+ }
+
+ if (!offloadMutex.tryLock()) {
+ log.info("try streaming offload,but already offloading");
+ return;
+ }
+
+ //get newest config and drop progress status of last offload
+ offloader = getConfig().getLedgerOffloader().fork();
+
+ this.offloadSegments = Queues.newConcurrentLinkedQueue();
+
+ initializeSegments();
+
+ if (offloadSegments.isEmpty()) {
+ log.error("Streaming offloading began but there is no segments to
offload, should not happen.");
+ throw new RuntimeException(
+ "Streaming offloading began but there is no segments to
offload, should not happen.");
+ }
+
+ startOffload();
+ }
+
+ private void initializeSegments() {
+ Long updatedLedgerId = null;
+ LedgerInfo updatedLedgerInfo = null;
+ for (Map.Entry<Long, LedgerInfo> idInfo : ledgers.entrySet()) {
+
+ final Long ledgerId = idInfo.getKey();
+ LedgerInfo ledgerInfo = idInfo.getValue();
+ String driverName = OffloadUtils.getOffloadDriverName(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverMetadata());
+
+ if (!ledgerInfo.hasOffloadContext()) {
+ final OffloadContext context = OffloadContext.newBuilder()
+ .setComplete(false)
+ .build();
+ ledgerInfo =
ledgerInfo.toBuilder().setOffloadContext(context).build();
+ }
+
+ if (!isStreamingOffloadCompleted(ledgerInfo)) {
+ List<OffloadSegment> newSegments = Lists.newArrayList();
+ // Continue from incomplete context
+ long beginEntry = 0;
+ for (OffloadSegment offloadSegment :
ledgerInfo.getOffloadContext().getOffloadSegmentList()) {
+ if (offloadSegment.getComplete()) {
+ if (!offloadSegment.hasEndEntryId()) {
+ log.error("segment of ledger {} offload completed
bug not have end entry id "
Review comment:
```suggestion
log.error("segment of ledger {} offload
completed but have not the end entry id "
```
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -1447,8 +1460,19 @@ void run() throws PulsarAdminException {
}
}
- OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
- , readBufferSizeInBytes, offloadThresholdInBytes,
offloadDeletionLagInMillis, offloadedReadPriority);
+ OffloadPolicies offloadPolicies = OffloadPolicies
+ .create(driver, region, bucket, endpoint, awsId,
awsSecret, maxBlockSizeInBytes
+ , readBufferSizeInBytes, offloadThresholdInBytes,
offloadDeletionLagInMillis,
+ offloadedReadPriority);
+
+
+ if (offloadMethod != null) {
Review comment:
same above
##########
File path:
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
##########
@@ -809,9 +833,10 @@ public void topics() throws Exception {
verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1")
;
cmdTopics.run(split(
- "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3
-r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
+ "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3
-r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first
--offloadMethod streaming-based"));
Review comment:
same above
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1724,7 +1725,15 @@ public TopicStats getStats(boolean getPreciseBacklog) {
info.ledgerId = li.getLedgerId();
info.entries = li.getEntries();
info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() &&
li.getOffloadContext().getComplete();
+ info.offloaded = li.hasOffloadContext() &&
(li.getOffloadContext().getComplete()
+ || ManagedLedgerImpl.isStreamingOffloadCompleted(li));
+ if (info.offloaded) {
+ if (li.getOffloadContext().getComplete()) {
+ info.offloadMethod =
OffloadMethod.LEDGER_BASED.getStrValue();
+ } else {
+ info.offloadMethod =
OffloadMethod.STREAMING_BASED.getStrValue();
+ }
+ }
Review comment:
else info.offloadMethod = OffloadMethod.NONE?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -465,7 +773,7 @@ public void operationFailed(MetaStoreException e) {
LedgerInfo info =
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
ledgers.put(lh.getId(), info);
-
+ initializeStreamingOffloader();
Review comment:
Why not initialize the streaming offload in the ManagedLedger
initialization?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -386,6 +408,292 @@ public void operationFailed(MetaStoreException e) {
scheduleRollOverLedgerTask();
}
+ /**
+ * Should be called after `ledgers` were initialized.
+ */
+ void initializeStreamingOffloader() {
+ if (getOffloadMethod() == OffloadMethod.STREAMING_BASED) {
+ log.info("Streaming offload enabled for managed ledger: {}", name);
+ } else {
+ log.info("Streaming offload not enabled for managed ledger: {}",
name);
+ return;
+ }
+
+ if (!offloadMutex.tryLock()) {
+ log.info("try streaming offload,but already offloading");
+ return;
+ }
+
+ //get newest config and drop progress status of last offload
+ offloader = getConfig().getLedgerOffloader().fork();
+
+ this.offloadSegments = Queues.newConcurrentLinkedQueue();
+
+ initializeSegments();
+
+ if (offloadSegments.isEmpty()) {
+ log.error("Streaming offloading began but there is no segments to
offload, should not happen.");
+ throw new RuntimeException(
+ "Streaming offloading began but there is no segments to
offload, should not happen.");
+ }
+
+ startOffload();
+ }
+
+ private void initializeSegments() {
+ Long updatedLedgerId = null;
+ LedgerInfo updatedLedgerInfo = null;
+ for (Map.Entry<Long, LedgerInfo> idInfo : ledgers.entrySet()) {
+
+ final Long ledgerId = idInfo.getKey();
+ LedgerInfo ledgerInfo = idInfo.getValue();
+ String driverName = OffloadUtils.getOffloadDriverName(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(ledgerInfo,
+ config.getLedgerOffloader().getOffloadDriverMetadata());
+
+ if (!ledgerInfo.hasOffloadContext()) {
+ final OffloadContext context = OffloadContext.newBuilder()
+ .setComplete(false)
+ .build();
+ ledgerInfo =
ledgerInfo.toBuilder().setOffloadContext(context).build();
+ }
+
+ if (!isStreamingOffloadCompleted(ledgerInfo)) {
+ List<OffloadSegment> newSegments = Lists.newArrayList();
+ // Continue from incomplete context
+ long beginEntry = 0;
+ for (OffloadSegment offloadSegment :
ledgerInfo.getOffloadContext().getOffloadSegmentList()) {
+ if (offloadSegment.getComplete()) {
+ if (!offloadSegment.hasEndEntryId()) {
+ log.error("segment of ledger {} offload completed
bug not have end entry id "
+ + "should not happen. {}", ledgerId,
ledgerInfo);
+ } else {
+ beginEntry = offloadSegment.getEndEntryId() + 1;
+ newSegments.add(offloadSegment);
+ }
+ }
+ }
+
+ UUID uuid = UUID.randomUUID();
+ final OffloadSegment.Builder segment =
OffloadSegment.newBuilder()
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setAssignedTimestamp(System.currentTimeMillis())
+ .setComplete(false);
+ OffloadUtils.setOffloadDriverMetadata(segment, driverName,
driverMetadata);
+ newSegments.add(segment.build());
+ final OffloadContext context =
ledgerInfo.getOffloadContext().toBuilder().clearOffloadSegment()
+ .addAllOffloadSegment(newSegments).build();
+ final LedgerInfo newLedgerInfo =
ledgerInfo.toBuilder().setOffloadContext(context).build();
+ updatedLedgerId = idInfo.getKey();
+ updatedLedgerInfo = newLedgerInfo;
+ offloadSegments.add(new OffloadSegmentInfoImpl(uuid, ledgerId,
beginEntry, driverName, driverMetadata));
+ break;
+ }
+ }
+ log.debug("updated ledgerId: {}", updatedLedgerId);
+ ledgers.put(updatedLedgerId, updatedLedgerInfo);
Review comment:
Looks like the `updatedLedgerId` and `updateLedgerInfo` can be null?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -780,12 +1088,99 @@ private boolean beforeAddEntry(OpAddEntry addOperation) {
}
}
+ /**
+ * This method should not block the thread, if the buffer is full then use
another runnable to fill data
+ * when buffer available.
+ *
+ * @param addOperation
+ */
+ protected synchronized void addToOffload(OpAddEntry addOperation) {
+ if (currentOffloadHandle == null) {
+ return;
+ }
+ final PositionImpl positionNextToOffered = getNextValidPosition(
+ PositionImpl.get(currentOffloadHandle.lastOffered()));
+
+ final PositionImpl offeringPosition = addOperation.getPosition();
+ if (positionNextToOffered
+ .equals(offeringPosition)) {
+ final EntryImpl entry = EntryImpl
+ .create(PositionImpl.get(addOperation.ledger.getId(),
addOperation.getEntryId()),
+ addOperation.getData());
+
+ OfferEntryResult offerEntryResult =
currentOffloadHandle.offerEntry(entry);
+ entry.release();
+ switch (offerEntryResult) {
+ case SUCCESS:
+ //happy case
+ return;
+ case FAIL_SEGMENT_CLOSED:
+ log.debug("segment closed");
+ return;
+ case FAIL_BUFFER_FULL:
+ log.debug("buffer full");
+ break;
+ }
+ }
+
+ if (offloadEntryFillTask == null || offloadEntryFillTask.isDone()) {
+ offloadEntryFillTask = new CompletableFuture<>();
+ scheduledExecutor.schedule(safeRun(() ->
entryFillLoop(currentOffloadHandle, positionNextToOffered,
+ PositionImpl.get(offeringPosition),
+ offloadEntryFillTask)), 100, TimeUnit.MILLISECONDS);
+ } // else fill when next entry added
+ }
+
+ private void entryFillLoop(OffloadHandle offloadHandle,
+ PositionImpl beginPosition, PositionImpl
endPosition,
+ CompletableFuture<Void> offloadEntryFillTask) {
+ asyncReadEntry(beginPosition, new ReadEntryCallback() {
+ void delayExecute(OffloadHandle OffloadHandle,
+ PositionImpl beginPosition, PositionImpl
endPosition,
+ CompletableFuture<Void> offloadEntryFillTask) {
+ scheduledExecutor
+ .schedule(() -> entryFillLoop(OffloadHandle,
beginPosition, endPosition,
+ offloadEntryFillTask)
+ , 100, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ final OfferEntryResult offerEntryResult =
offloadHandle.offerEntry(entry);
+ entry.release();
+
+ switch (offerEntryResult) {
+ case FAIL_BUFFER_FULL:
+ delayExecute(offloadHandle, beginPosition,
endPosition, offloadEntryFillTask);
+ break;
+ case FAIL_SEGMENT_CLOSED:
+ log.debug("segment closed");
+ break;
+ case SUCCESS:
+ if (beginPosition == endPosition) {
+ offloadEntryFillTask.complete(null);
+ } else {
+ final PositionImpl nextPos =
getNextValidPosition(beginPosition);
+ entryFillLoop(offloadHandle, nextPos, endPosition,
+ offloadEntryFillTask);
+ }
+ break;
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ offloadEntryFillTask.completeExceptionally(exception);
Review comment:
I don't see anyone handle the error from the offloadEntryFillTask, so we
might need to log errors?
##########
File path:
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
##########
@@ -566,11 +566,13 @@ public void namespaces() throws Exception {
verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
namespaces.run(split(
- "set-offload-policies myprop/clust/ns1 -r test-region -d
aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae
10s -orp tiered-storage-first"));
- verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
- OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
- "http://test.endpoint", null, null, 32 * 1024 * 1024,
5 * 1024 * 1024,
- 10 * 1024 * 1024L, 10000L,
OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
+ "set-offload-policies -r test-region -d aws-s3 -b test-bucket
-e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp
tiered-storage-first --offloadMethod streaming-based myprop/clust/ns1"));
Review comment:
Add another test case and keep the old test case?
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
##########
@@ -96,6 +95,56 @@ public String getValue() {
}
}
+ @InterfaceAudience.Public
+ @InterfaceStability.Stable
+ public enum OffloadMethod {
+ /**
+ * Ledger based offload, one offload segment corresponding to a ledger
+ */
+ LEDGER_BASED("ledger-based"),
Review comment:
maybe `LEDGER` is enough.
##########
File path:
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadTest.java
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isStreamingOffloadCompleted;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderStreamingTest;
+import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class StreamingOffloadTest extends MockedBookKeeperTestCase {
+
+ BlobStoreManagedLedgerOffloaderStreamingTest offloaderGenerator = new
BlobStoreManagedLedgerOffloaderStreamingTest();
+
+ public StreamingOffloadTest() throws Exception {
+ }
+
+ @Test
+ public void testStreamingOffload() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ System.out.println("offloader.getClass() = " + offloader.getClass());
Review comment:
use log?
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -1851,6 +1865,14 @@ void run() throws PulsarAdminException {
maxBlockSizeInBytes, readBufferSizeInBytes,
offloadAfterThresholdInBytes,
offloadAfterElapsedInMillis, offloadedReadPriority);
+ if (offloadMethod != null) {
Review comment:
Don't we need to set a default value to the `ledger-base` for the
offload method?
##########
File path:
tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
##########
@@ -193,16 +196,93 @@ public void
testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String a
}
}
+ public void testStreamingOffload(String serviceUrl, String adminUrl)
throws Exception {
+ final String tenant = "offload-test-threshold-" + randomName(4);
+ final String namespace = tenant + "/ns2";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+ "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+ "--admin-roles", "offload-admin", tenant);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "create", "--clusters", pulsarCluster.getClusterName(),
namespace);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-offload-policies",
+ "-d", getEnv().get("managedLedgerOffloadDriver"),
+ "-b", getEnv().get("s3ManagedLedgerOffloadBucket"),
+ "-e", getEnv().get("s3ManagedLedgerOffloadServiceEndpoint"),
+ "--maxSegmentRolloverTimeSec", "5", //let segment close fast
+ "-mbs", "32M", "-rbs", "5M", "-oat", "10M", "-oae", "10s",
+ "-orp", "tiered-storage-first", "--offloadMethod",
"streaming-based", namespace);
+
+ long firstLedger = 0;
+ List<MessageId> messageIds = new LinkedList<>();
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer<byte[]> producer = client.newProducer().topic(topic)
+ .blockIfQueueFull(true).enableBatching(false).create();
+ ) {
+
+
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
+
+ // write enough to topic to make it roll twice
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ final CompletableFuture<MessageId> messageId = producer
+ .sendAsync(buildEntry("offload-message" + i));
+ messageIds.add(messageId.get());
+ }
+
+ producer.flush();
+ }
+
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+ firstLedger =
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
+
+ // wait up to 30 seconds for offload to occur
+ for (int i = 0; i < 300 &&
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
Review comment:
using Awaitility to do this?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -94,7 +94,7 @@
final private long maxBufferLength;
final private ConcurrentLinkedQueue<Entry> offloadBuffer = new
ConcurrentLinkedQueue<>();
private CompletableFuture<OffloadResult> offloadResult;
- private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+ private volatile PositionImpl lastOfferedPosition = PositionImpl.earliest;
Review comment:
Why do you need to change this?
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
##########
@@ -96,6 +95,56 @@ public String getValue() {
}
}
+ @InterfaceAudience.Public
+ @InterfaceStability.Stable
+ public enum OffloadMethod {
+ /**
+ * Ledger based offload, one offload segment corresponding to a ledger
+ */
+ LEDGER_BASED("ledger-based"),
+ /**
+ * Streaming offload, offload segments are divided by time or size
+ */
+ STREAMING_BASED("streaming-based"),
Review comment:
`STREAMING`
##########
File path:
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadTest.java
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isStreamingOffloadCompleted;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderStreamingTest;
+import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class StreamingOffloadTest extends MockedBookKeeperTestCase {
+
+ BlobStoreManagedLedgerOffloaderStreamingTest offloaderGenerator = new
BlobStoreManagedLedgerOffloaderStreamingTest();
+
+ public StreamingOffloadTest() throws Exception {
+ }
+
+ @Test
+ public void testStreamingOffload() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ System.out.println("offloader.getClass() = " + offloader.getClass());
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ }
+
+ @Test
+ public void testStreamingOffloadAndRead() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("test_read_and_write", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ while
(!isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0))) {
Review comment:
use `Awaitility`?
##########
File path:
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadTest.java
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isStreamingOffloadCompleted;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderStreamingTest;
+import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class StreamingOffloadTest extends MockedBookKeeperTestCase {
+
+ BlobStoreManagedLedgerOffloaderStreamingTest offloaderGenerator = new
BlobStoreManagedLedgerOffloaderStreamingTest();
+
+ public StreamingOffloadTest() throws Exception {
+ }
+
+ @Test
+ public void testStreamingOffload() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ System.out.println("offloader.getClass() = " + offloader.getClass());
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ }
+
+ @Test
+ public void testStreamingOffloadAndRead() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("test_read_and_write", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ while
(!isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0))) {
+ Thread.sleep(10);
+ }
+
Assert.assertTrue(isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0)));
+
Assert.assertTrue(isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(1)));
+
+ ManagedCursor cursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
+ int i = 0;
+ for (Entry e : cursor.readEntries(10)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ for (Entry e : cursor.readEntries(10)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ for (Entry e : cursor.readEntries(5)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+ }
+
+ @Test
+ public void testSetStreamingOffloadAfterCreated() throws
ManagedLedgerException, InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.LEDGER_BASED.getStrValue());
+ }}
+ );
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("test_read_and_write", config);
+ int beforeSetStreaming = 1;
+ for (int i = 0; i < beforeSetStreaming; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+
+ LedgerOffloader streamingOffloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+
+ config.setLedgerOffloader(streamingOffloader);
+ ledger.setConfig(config);
+
+ for (int i = beforeSetStreaming; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ Thread.sleep(5000);
Review comment:
Do we have any explicit conditions we can verify?
##########
File path:
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadTest.java
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isStreamingOffloadCompleted;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderStreamingTest;
+import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class StreamingOffloadTest extends MockedBookKeeperTestCase {
+
+ BlobStoreManagedLedgerOffloaderStreamingTest offloaderGenerator = new
BlobStoreManagedLedgerOffloaderStreamingTest();
+
+ public StreamingOffloadTest() throws Exception {
+ }
+
+ @Test
+ public void testStreamingOffload() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ System.out.println("offloader.getClass() = " + offloader.getClass());
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ }
+
+ @Test
+ public void testStreamingOffloadAndRead() throws ManagedLedgerException,
InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("test_read_and_write", config);
+
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ while
(!isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0))) {
+ Thread.sleep(10);
+ }
+
Assert.assertTrue(isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0)));
+
Assert.assertTrue(isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(1)));
+
+ ManagedCursor cursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
+ int i = 0;
+ for (Entry e : cursor.readEntries(10)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ for (Entry e : cursor.readEntries(10)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ for (Entry e : cursor.readEntries(5)) {
+ assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+ }
+
+ @Test
+ public void testSetStreamingOffloadAfterCreated() throws
ManagedLedgerException, InterruptedException, IOException,
+ ExecutionException {
+ LedgerOffloader offloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.LEDGER_BASED.getStrValue());
+ }}
+ );
+ int ENTRIES_PER_LEDGER = 10;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(ENTRIES_PER_LEDGER);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("test_read_and_write", config);
+ int beforeSetStreaming = 1;
+ for (int i = 0; i < beforeSetStreaming; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+
+ LedgerOffloader streamingOffloader = offloaderGenerator.getOffloader(
+ new HashMap<String, String>() {{
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+
put(offloaderGenerator.getConfig().getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE)
+ .get(0), "5242880");
+
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+ put("offloadMethod",
OffloadPolicies.OffloadMethod.STREAMING_BASED.getStrValue());
+ }}
+ );
+
+ config.setLedgerOffloader(streamingOffloader);
+ ledger.setConfig(config);
+
+ for (int i = beforeSetStreaming; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ Thread.sleep(5000);
+ final LedgerOffloader.OffloadHandle currentOffloadHandle =
ledger.getCurrentOffloadHandle();
+ currentOffloadHandle.close();
+ final LedgerOffloader.OffloadResult offloadResult =
currentOffloadHandle.getOffloadResultAsync().get();
+ log.info("offloadResult = " + offloadResult);
+ log.info("offload method: " + ledger.getOffloadMethod());
+ while
(!isStreamingOffloadCompleted(ledger.getLedgersInfoAsList().get(0))) {
Review comment:
use `Awaitility`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]