ivankelly commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r164794996
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -851,6 +849,31 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { } } + @Override + protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) { + checkArgument(state == State.Connected); + + CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId()); + + if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { + Consumer consumer = consumerFuture.getNow(null); + long requestId = getLastMessageId.getRequestId(); + + Position position = consumer.getSubscription().getTopic().getLastMessageId(); + + log.info("[{}] [{}][{}] Get LastMessageId {}", remoteAddress, + consumer.getSubscription().getTopic().getName(), consumer.getSubscription().getName(), position); + MessageIdData messageId = MessageIdData.newBuilder() Review comment: Actually, maybe it would make more sense to add partition at the client side? From the broker point of view, a partition really just looks like another topic, it has it's own managed ledger. It's only from the client side that partition and batch id are important. (@merlimat @sijie what are your opinions on this)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services