codelipenghui commented on a change in pull request #9202:
URL: https://github.com/apache/pulsar/pull/9202#discussion_r568707985



##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
##########
@@ -148,6 +200,11 @@ public String getValue() {
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private OffloadedReadPriority managedLedgerOffloadedReadPriority = 
DEFAULT_OFFLOADED_READ_PRIORITY;
+    @Configuration
+    private OffloadMethod offloadMethod = DEFAULT_OFFLOAD_METHOD;

Review comment:
       And is it able to configure at the broker.conf? Seems not related 
changes.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -94,7 +94,7 @@
     final private long maxBufferLength;
     final private ConcurrentLinkedQueue<Entry> offloadBuffer = new 
ConcurrentLinkedQueue<>();
     private CompletableFuture<OffloadResult> offloadResult;
-    private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+    private volatile PositionImpl lastOfferedPosition = PositionImpl.earliest;

Review comment:
       +1 any reason we need to change the latest to the earliest, seem this 
will introduce the opposite behaivor

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -91,6 +90,10 @@ public OffloadResult(long beginLedger, long beginEntry, long 
endLedger, long end
     String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
     String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
 
+    default LedgerOffloader fork() {
+        throw new UnsupportedOperationException(this.getClass().toString() + " 
fork()");
+    }

Review comment:
       What is this method used for? If it is a public method in the 
LedgerOffloader, could you please give more description of this method?

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
##########
@@ -148,6 +200,11 @@ public String getValue() {
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private OffloadedReadPriority managedLedgerOffloadedReadPriority = 
DEFAULT_OFFLOADED_READ_PRIORITY;
+    @Configuration
+    private OffloadMethod offloadMethod = DEFAULT_OFFLOAD_METHOD;

Review comment:
       Please add `@JsonProperty(access = JsonProperty.Access.READ_WRITE)`, 
Otherwise the Pulsar SQL will not work while the presto coordinator assigns the 
splits to works. More details at https://github.com/apache/pulsar/pull/9300
   
   Please check all.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -163,7 +166,7 @@ public OffloadResult(long beginLedger, long beginEntry, 
long endLedger, long end
     default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger 
ml, UUID uid, long beginLedger,
                                                               long beginEntry,
                                                               Map<String, 
String> driverMetadata) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException(this.getClass().toString() + " 
streamingOffload");

Review comment:
       Why not remove the default implementation in the LedgerOffloader?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -198,12 +201,18 @@ public OffloadResult(long beginLedger, long beginEntry, 
long endLedger, long end
 
     default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, 
MLDataFormats.OffloadContext ledgerContext,
                                                         Map<String, String> 
offloadDriverMetadata) {
-        throw new UnsupportedOperationException();
+        final UUID uuid = new UUID(ledgerContext.getUidMsb(), 
ledgerContext.getUidLsb());
+        if (ledgerContext.hasComplete() && ledgerContext.getComplete()) {
+            //ledger based offloading
+            return readOffloaded(ledgerId, uuid, offloadDriverMetadata);
+        } else {
+            throw new UnsupportedOperationException(this.getClass().toString() 
+ " readOffloaded");
+        }
     }
 
     default CompletableFuture<Void> deleteOffloaded(UUID uid,
                                                     Map<String, String> 
offloadDriverMetadata) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException(this.getClass().toString() + " 
deleteOffloaded");

Review comment:
       Please give a meaningful exception message.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -198,12 +201,18 @@ public OffloadResult(long beginLedger, long beginEntry, 
long endLedger, long end
 
     default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, 
MLDataFormats.OffloadContext ledgerContext,
                                                         Map<String, String> 
offloadDriverMetadata) {
-        throw new UnsupportedOperationException();
+        final UUID uuid = new UUID(ledgerContext.getUidMsb(), 
ledgerContext.getUidLsb());
+        if (ledgerContext.hasComplete() && ledgerContext.getComplete()) {
+            //ledger based offloading
+            return readOffloaded(ledgerId, uuid, offloadDriverMetadata);
+        } else {
+            throw new UnsupportedOperationException(this.getClass().toString() 
+ " readOffloaded");

Review comment:
       Should complete the CompletableFuture with exception. And please give a 
meaningful exception message.

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
##########
@@ -96,6 +95,56 @@ public String getValue() {
         }
     }
 
+    @InterfaceAudience.Public
+    @InterfaceStability.Stable
+    public enum OffloadMethod {
+        /**
+         * Ledger based offload, one offload segment corresponding to a ledger
+         */
+        LEDGER_BASED("ledger-based"),
+        /**
+         * Streaming offload, offload segments are divided by time or size
+         */
+        STREAMING_BASED("streaming-based"),
+        /**
+         * Disable offload
+         */
+        NONE("none");

Review comment:
       Why need to disable the tiered storage through the offload method? I 
think the current behavior of enabled or disabled the tiered storage is based 
on the OffloadPolicies is null or not?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -780,12 +1088,99 @@ private boolean beforeAddEntry(OpAddEntry addOperation) {
         }
     }
 
+    /**
+     * This method should not block the thread, if the buffer is full then use 
another runnable to fill data
+     * when buffer available.
+     *
+     * @param addOperation
+     */
+    protected synchronized void addToOffload(OpAddEntry addOperation) {

Review comment:
       Why need synchronized here? The OfAddEntry callback always called by 
single thread?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -780,12 +1088,99 @@ private boolean beforeAddEntry(OpAddEntry addOperation) {
         }
     }
 
+    /**
+     * This method should not block the thread, if the buffer is full then use 
another runnable to fill data
+     * when buffer available.
+     *
+     * @param addOperation
+     */
+    protected synchronized void addToOffload(OpAddEntry addOperation) {
+        if (currentOffloadHandle == null) {
+            return;
+        }
+        final PositionImpl positionNextToOffered = getNextValidPosition(
+                PositionImpl.get(currentOffloadHandle.lastOffered()));
+
+        final PositionImpl offeringPosition = addOperation.getPosition();
+        if (positionNextToOffered
+                .equals(offeringPosition)) {
+            final EntryImpl entry = EntryImpl
+                    .create(PositionImpl.get(addOperation.ledger.getId(), 
addOperation.getEntryId()),
+                            addOperation.getData());
+
+            OfferEntryResult offerEntryResult = 
currentOffloadHandle.offerEntry(entry);
+            entry.release();
+            switch (offerEntryResult) {
+                case SUCCESS:
+                    //happy case
+                    return;
+                case FAIL_SEGMENT_CLOSED:
+                    log.debug("segment closed");

Review comment:
       More meaningful logs can help troubleshoot problems, could you please 
check all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to