Renkai commented on a change in pull request #9202:
URL: https://github.com/apache/pulsar/pull/9202#discussion_r569112461



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -386,6 +408,292 @@ public void operationFailed(MetaStoreException e) {
         scheduleRollOverLedgerTask();
     }
 
+    /**
+     * Should be called after `ledgers` were initialized.
+     */
+    void initializeStreamingOffloader() {
+        if (getOffloadMethod() == OffloadMethod.STREAMING_BASED) {
+            log.info("Streaming offload enabled for managed ledger: {}", name);
+        } else {
+            log.info("Streaming offload not enabled for managed ledger: {}", 
name);
+            return;
+        }
+
+        if (!offloadMutex.tryLock()) {
+            log.info("try streaming offload,but already offloading");
+            return;
+        }
+
+        //get newest config and drop progress status of last offload
+        offloader = getConfig().getLedgerOffloader().fork();
+
+        this.offloadSegments = Queues.newConcurrentLinkedQueue();
+
+        initializeSegments();
+
+        if (offloadSegments.isEmpty()) {
+            log.error("Streaming offloading began but there is no segments to 
offload, should not happen.");
+            throw new RuntimeException(
+                    "Streaming offloading began but there is no segments to 
offload, should not happen.");
+        }
+
+        startOffload();
+    }
+
+    private void initializeSegments() {
+        Long updatedLedgerId = null;
+        LedgerInfo updatedLedgerInfo = null;
+        for (Map.Entry<Long, LedgerInfo> idInfo : ledgers.entrySet()) {
+
+            final Long ledgerId = idInfo.getKey();
+            LedgerInfo ledgerInfo = idInfo.getValue();
+            String driverName = OffloadUtils.getOffloadDriverName(ledgerInfo,
+                    config.getLedgerOffloader().getOffloadDriverName());
+            Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(ledgerInfo,
+                    config.getLedgerOffloader().getOffloadDriverMetadata());
+
+            if (!ledgerInfo.hasOffloadContext()) {
+                final OffloadContext context = OffloadContext.newBuilder()
+                        .setComplete(false)
+                        .build();
+                ledgerInfo = 
ledgerInfo.toBuilder().setOffloadContext(context).build();
+            }
+
+            if (!isStreamingOffloadCompleted(ledgerInfo)) {
+                List<OffloadSegment> newSegments = Lists.newArrayList();
+                // Continue from incomplete context
+                long beginEntry = 0;
+                for (OffloadSegment offloadSegment : 
ledgerInfo.getOffloadContext().getOffloadSegmentList()) {
+                    if (offloadSegment.getComplete()) {
+                        if (!offloadSegment.hasEndEntryId()) {
+                            log.error("segment of ledger {} offload completed 
bug not have end entry id "
+                                    + "should not happen. {}", ledgerId, 
ledgerInfo);
+                        } else {
+                            beginEntry = offloadSegment.getEndEntryId() + 1;
+                            newSegments.add(offloadSegment);
+                        }
+                    }
+                }
+
+                UUID uuid = UUID.randomUUID();
+                final OffloadSegment.Builder segment = 
OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setAssignedTimestamp(System.currentTimeMillis())
+                        .setComplete(false);
+                OffloadUtils.setOffloadDriverMetadata(segment, driverName, 
driverMetadata);
+                newSegments.add(segment.build());
+                final OffloadContext context = 
ledgerInfo.getOffloadContext().toBuilder().clearOffloadSegment()
+                        .addAllOffloadSegment(newSegments).build();
+                final LedgerInfo newLedgerInfo = 
ledgerInfo.toBuilder().setOffloadContext(context).build();
+                updatedLedgerId = idInfo.getKey();
+                updatedLedgerInfo = newLedgerInfo;
+                offloadSegments.add(new OffloadSegmentInfoImpl(uuid, ledgerId, 
beginEntry, driverName, driverMetadata));
+                break;
+            }
+        }
+        log.debug("updated ledgerId: {}", updatedLedgerId);
+        ledgers.put(updatedLedgerId, updatedLedgerInfo);

Review comment:
       No, but I will move this up into the loop to avoid misleading




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to