BewareMyPower commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2126217163
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,131 @@ protected
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}
+
+ protected CompletableFuture<MessageId>
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+ if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "GetMessageIDByIndex is not allowed when broker entry
metadata is disabled"));
+ }
+ if (index == null || index < 0) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ "Invalid message index: " + index));
+ }
+ int partitionIndex = topicName.getPartitionIndex();
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+ return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported
getMessageIdByIndex operation on "
+ + "partitioned-topic
{}", clientAppId(), topicName);
+ throw new
RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageIDByIndex is not
allowed on partitioned-topic");
+ }
+ });
+ }
+ }).thenCompose(ignore ->
validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.error("[{}] Get message id by index on a
non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Get message id by index on a non-persistent
topic is not allowed"));
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ return findMessageIndexByPosition(
+
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+ managedLedger)
+ .thenCompose(firstIndex -> {
+ if (index < firstIndex) {
+ return
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+ } else {
+ return
managedLedger.asyncFindPosition(entry -> {
+ try {
+ Long messageIndex =
getIndexFromEntry(entry);
+ if (messageIndex == null) {
+ return false; // Skip messages
without index
+ } else {
+ // If the message index is
less than the requested index,
+ // we continue searching
+ return messageIndex < index;
+ }
+ } catch (IOException e) {
+ log.error("Error deserialize
message for message position find", e);
+ return false;
+ } finally {
+ entry.release();
+ }
+ });
+ }
+ }).thenCompose(position -> {
+ Position lastPosition =
managedLedger.getLastConfirmedEntry();
+ if (position == null ||
position.compareTo(lastPosition) > 0) {
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND,
+ "Message not found for index " +
index));
+ } else {
+ return
CompletableFuture.completedFuture(position);
+ }
+ });
+ }).thenCompose(position -> CompletableFuture.completedFuture(
+ new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), partitionIndex)));
+ }
+
+ protected CompletableFuture<Long> findMessageIndexByPosition(Position
position, ManagedLedger managedLedger) {
+ CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+ managedLedger.asyncReadEntry(position, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ try {
+ Long index = getIndexFromEntry(entry);
+ if (index == null) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Broker entry metadata is not present in the
message"));
+ } else if (index < 0) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid message index: " + index));
+ } else {
+ indexFuture.complete(index);
+ }
+ } catch (IOException e) {
+ indexFuture.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ "Failed to get index from entry: " +
e.getMessage()));
+ } finally {
+ entry.release();
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("[{}] Failed to read position {} on topic {}",
+ clientAppId(), position, topicName, exception);
+ indexFuture.completeExceptionally(exception);
+ }
+ }, null);
+ return indexFuture;
+ }
+
+
+ private static Long getIndexFromEntry(Entry entry) throws IOException {
+ try {
+ final var brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex())
{
+ return brokerEntryMetadata.getIndex();
+ } else {
+ return null;
+ }
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
Review Comment:
The benefit of checked exceptions is to force the caller to handle the
exception, which should be a "known" exception. However, the `IOException`
thrown in this method is just a wrapper on an "unknown" exception.
A normal practice of exception handling should be:
- Fail fast or propagate the exception for unknown exceptions, because the
caller never knows how to handle it correctly
- Handle the known exception according to the method's API docs
Specifically, for unknown exceptions, if propagating it could bring
unexpected result, we should catch `Throwable` and handle the case well, for
example,
```java
@RequiredArgsConstructor
public class Main {
private final Function<Integer, Integer> func;
public CompletableFuture<Integer> f(int x) {
final var future = new CompletableFuture<Integer>();
new Thread(() -> future.complete(func.apply(x))).start();
return future;
}
public static void main(String[] args) {
final var main = new Main(x -> 1 / x);
final var result = main.f(0).join();
System.out.println(result);
}
}
```
To avoid the future of `Main#f` is never returned, we should add a catch
clause for it:
```java
try {
future.complete(func.apply(x));
} catch (Throwable throwable) {
future.completeExceptionally(throwable);
}
```
But adding an additional method with exception signature is bad:
```java
public CompletableFuture<Integer> f(int x) {
final var future = new CompletableFuture<Integer>();
new Thread(() -> {
try {
future.complete(apply(x));
} catch (IOException e) {
future.completeExceptionally(e);
}
}).start();
return future;
}
public int apply(int x) throws IOException {
try {
return func.apply(x);
} catch (Throwable throwable) {
throw new IOException(throwable);
}
}
```
In this case, it's better to keep `getIndexFromEntry` simple without the
exception signature and catch the `Throwable` when calling it like the
suggestion
[here](https://github.com/apache/pulsar/pull/24222/files#r2125780470).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]