This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9d2e2663003532cb6e1e1a6f53c2b3c2449a880
Author: gaozhangmin <zhangmin...@apache.org>
AuthorDate: Mon Jan 9 19:39:09 2023 +0800

    [improve][admin]internalGetMessageById shouldn't be allowed on partitioned 
topic (#19013)
    
    Co-authored-by: gavingaozhangmin <gavingaozhang...@didiglobal.com>
    (cherry picked from commit b05fddb1af03456438f27217dc6979be00fac19e)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 97 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 20 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 20 +++--
 3 files changed, 75 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d31092c5829..b5609892b09 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2763,60 +2763,65 @@ public class PersistentTopicsBase extends AdminResource 
{
         return seekPosition;
     }
 
-    protected void internalGetMessageById(AsyncResponse asyncResponse, long 
ledgerId, long entryId,
-                                              boolean authoritative) {
-        // will redirect if the topic not owned by current broker
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
-                .thenCompose(__ -> {
-                    CompletableFuture<Void> ret;
-                    if (topicName.isGlobal()) {
-                        ret = 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-                    } else {
-                        ret = CompletableFuture.completedFuture(null);
-                    }
-                    return ret;
-                })
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenAccept(topic -> {
-                    ManagedLedgerImpl ledger =
-                            (ManagedLedgerImpl) ((PersistentTopic) 
topic).getManagedLedger();
-                    ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
-                            new AsyncCallbacks.ReadEntryCallback() {
-                                @Override
-                                public void 
readEntryFailed(ManagedLedgerException exception,
-                                                            Object ctx) {
-                                    asyncResponse.resume(new 
RestException(exception));
-                                }
+    protected CompletableFuture<Response> internalGetMessageById(long 
ledgerId, long entryId, boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+        return future.thenCompose(__ -> {
+            if (topicName.isPartitioned()) {
+                return CompletableFuture.completedFuture(null);
+            } else {
+                return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                        .thenAccept(topicMetadata -> {
+                            if (topicMetadata.partitions > 0) {
+                                log.warn("[{}] Not supported getMessageById 
operation on partitioned-topic {}",
+                                        clientAppId(), topicName);
+                                throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                        "GetMessageById is not allowed on 
partitioned-topic");
+                            }
+                        });
 
-                                @Override
-                                public void readEntryComplete(Entry entry, 
Object ctx) {
-                                    try {
-                                        
asyncResponse.resume(generateResponseWithEntry(entry));
-                                    } catch (IOException exception) {
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    } finally {
-                                        if (entry != null) {
-                                            entry.release();
-                                        }
-                                    }
+            }
+        })
+        .thenCompose(ignore -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+        .thenCompose(__ -> getTopicReferenceAsync(topicName))
+        .thenCompose(topic -> {
+            CompletableFuture<Response> results = new CompletableFuture<>();
+            ManagedLedgerImpl ledger =
+                    (ManagedLedgerImpl) ((PersistentTopic) 
topic).getManagedLedger();
+            ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
+                    new AsyncCallbacks.ReadEntryCallback() {
+                        @Override
+                        public void readEntryFailed(ManagedLedgerException 
exception,
+                                                    Object ctx) {
+                            throw new RestException(exception);
+                        }
+
+                        @Override
+                        public void readEntryComplete(Entry entry, Object ctx) 
{
+                            try {
+                                
results.complete(generateResponseWithEntry(entry));
+                            } catch (IOException exception) {
+                                throw new RestException(exception);
+                            } finally {
+                                if (entry != null) {
+                                    entry.release();
                                 }
+                            }
+                        }
 
                                 @Override
                                 public String toString() {
                                     return String.format("Topic [%s] internal 
get message by id",
                                             
PersistentTopicsBase.this.topicName);
                                 }
-                            }, null);
-                }).exceptionally(ex -> {
-                    // If the exception is not redirect exception we need to 
log it.
-                    if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to get message with ledgerId {} 
entryId {} from {}",
-                                clientAppId(), ledgerId, entryId, topicName, 
ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+                    }, null);
+            return results;
+        });
     }
 
     protected CompletableFuture<MessageId> 
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e3f3adbfee..45ee532685f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -814,14 +814,18 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                @PathParam("topic") @Encoded String 
encodedTopic, @PathParam("ledgerId") Long ledgerId,
                                @PathParam("entryId") Long entryId,
                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
-        try {
-            validateTopicName(property, cluster, namespace, encodedTopic);
-            internalGetMessageById(asyncResponse, ledgerId, entryId, 
authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        internalGetMessageById(ledgerId, entryId, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    // If the exception is not redirect exception we need to 
log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get message with ledgerId {} 
entryId {} from {}",
+                                clientAppId(), ledgerId, entryId, topicName, 
ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+        });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 23f887e84d2..ebb4f3a34de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1886,14 +1886,18 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("entryId") long entryId,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateTopicName(tenant, namespace, encodedTopic);
-            internalGetMessageById(asyncResponse, ledgerId, entryId, 
authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalGetMessageById(ledgerId, entryId, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    // If the exception is not redirect exception we need to 
log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get message with ledgerId {} 
entryId {} from {}",
+                                clientAppId(), ledgerId, entryId, topicName, 
ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET

Reply via email to