ivankelly commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187073441
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 ##########
 @@ -61,21 +73,81 @@ public S3ManagedLedgerOffloader(ServiceConfiguration conf,
         s3client = builder.build();
         this.bucket = bucket;
         this.scheduler = scheduler;
+        this.maxBlockSize = 
conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
+    }
+
+    static String offloadKey(ReadHandle readHandle, UUID uuid) {
+        return String.format("ledger-%d-%s", readHandle.getId(), 
uuid.toString());
     }
 
     @Override
-    public CompletableFuture<Void> offload(ReadHandle ledger,
-                                           UUID uid,
+    public CompletableFuture<Void> offload(ReadHandle readHandle,
+                                           UUID uuid,
                                            Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.submit(() -> {
-                try {
-                    s3client.putObject(bucket, uid.toString(), uid.toString());
-                    promise.complete(null);
-                } catch (Throwable t) {
-                    promise.completeExceptionally(t);
+            try {
+                String key = offloadKey(readHandle, uuid);
+                InitiateMultipartUploadResult initRes = 
s3client.initiateMultipartUpload(
+                    new InitiateMultipartUploadRequest(this.bucket, key));
+                OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
+                    .withMetadata(readHandle.getLedgerMetadata());
+
+                long startEntry = 0;
+                int partId = 2;
+                long bytesUploaded = 0;
+                List<PartETag> etags = new LinkedList<>();
+                while (startEntry <= readHandle.getLastAddConfirmed()) {
+                    int blockSize = BlockAwareSegmentInputStreamImpl
+                        .getBlockSize(readHandle, maxBlockSize, startEntry, 
bytesUploaded);
+
+                    BlockAwareSegmentInputStreamImpl blockStream = new 
BlockAwareSegmentInputStreamImpl(
+                        readHandle, startEntry, blockSize);
+
+                    UploadPartResult uploadRes = s3client.uploadPart(
+                        new UploadPartRequest()
+                            .withBucketName(bucket)
+                            .withKey(key)
+                            .withUploadId(initRes.getUploadId())
+                            .withInputStream(blockStream)
+                            .withPartSize(blockSize)
+                            .withPartNumber(partId));
+                    etags.add(uploadRes.getPartETag());
+
+                    indexBuilder.addBlock(startEntry, partId, blockSize);
+
+                    bytesUploaded += blockStream.getBlockEntryBytesCount();
+                    startEntry = blockStream.getEndEntryId() + 1;
+                    partId++;
+                    blockStream.close();
                 }
-            });
+
+                try (OffloadIndexBlock index = indexBuilder.build()) {
 
 Review comment:
   Should go into a different object, so the entries can be completed before 
the index is written

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to