ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187073441
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java ########## @@ -61,21 +73,81 @@ public S3ManagedLedgerOffloader(ServiceConfiguration conf, s3client = builder.build(); this.bucket = bucket; this.scheduler = scheduler; + this.maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes(); + } + + static String offloadKey(ReadHandle readHandle, UUID uuid) { + return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString()); } @Override - public CompletableFuture<Void> offload(ReadHandle ledger, - UUID uid, + public CompletableFuture<Void> offload(ReadHandle readHandle, + UUID uuid, Map<String, String> extraMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.submit(() -> { - try { - s3client.putObject(bucket, uid.toString(), uid.toString()); - promise.complete(null); - } catch (Throwable t) { - promise.completeExceptionally(t); + try { + String key = offloadKey(readHandle, uuid); + InitiateMultipartUploadResult initRes = s3client.initiateMultipartUpload( + new InitiateMultipartUploadRequest(this.bucket, key)); + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() + .withMetadata(readHandle.getLedgerMetadata()); + + long startEntry = 0; + int partId = 2; + long bytesUploaded = 0; + List<PartETag> etags = new LinkedList<>(); + while (startEntry <= readHandle.getLastAddConfirmed()) { + int blockSize = BlockAwareSegmentInputStreamImpl + .getBlockSize(readHandle, maxBlockSize, startEntry, bytesUploaded); + + BlockAwareSegmentInputStreamImpl blockStream = new BlockAwareSegmentInputStreamImpl( + readHandle, startEntry, blockSize); + + UploadPartResult uploadRes = s3client.uploadPart( + new UploadPartRequest() + .withBucketName(bucket) + .withKey(key) + .withUploadId(initRes.getUploadId()) + .withInputStream(blockStream) + .withPartSize(blockSize) + .withPartNumber(partId)); + etags.add(uploadRes.getPartETag()); + + indexBuilder.addBlock(startEntry, partId, blockSize); + + bytesUploaded += blockStream.getBlockEntryBytesCount(); + startEntry = blockStream.getEndEntryId() + 1; + partId++; + blockStream.close(); } - }); + + try (OffloadIndexBlock index = indexBuilder.build()) { Review comment: Should go into a different object, so the entries can be completed before the index is written ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services