This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f493676 support GetLastMessageId api (#3196) f493676 is described below commit f4936763af1c1dab54d031ae355e40d66d3a602e Author: legendtkl <taok...@gmail.com> AuthorDate: Fri Dec 21 13:19:28 2018 +0800 support GetLastMessageId api (#3196) issue ticket: https://github.com/apache/pulsar/issues/3162 (Fixes #3162) the pr is straightforward, and it exposes the Persistent Topic GetLastMessageId to Rest API: /admin/v2/persistent/{tenant}/{namespace}/{topic}/lastMessageId --- .../broker/admin/impl/PersistentTopicsBase.java | 17 +++ .../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++ .../broker/admin/AdminApiGetLastMessageIdTest.java | 142 +++++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1605d8c..de1af66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1450,4 +1450,21 @@ public class PersistentTopicsBase extends AdminResource { } return; } + + protected MessageId internalGetLastMessageId(boolean authoritative) { + validateAdminOperationOnTopic(authoritative); + + if (!(getTopicReference(topicName) instanceof PersistentTopic)) { + log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetLastMessageId on a non-persistent topic is not allowed"); + } + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + Position position = topic.getLastMessageId(); + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + MessageId messageId = new MessageIdImpl(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId(), partitionIndex); + + return messageId; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 437407b..350da17 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -498,4 +498,18 @@ public class PersistentTopics extends PersistentTopicsBase { validateTopicName(tenant, namespace, encodedTopic); return internalOffloadStatus(authoritative); } + + @GET + @Path("/{tenant}/{namespace}/{topic}/lastMessageId") + @ApiOperation(value = "Return the last commit message id of topic") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public MessageId getLastMessageId(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + return internalGetLastMessageId(authoritative); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java new file mode 100644 index 0000000..cc5cf0e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.admin.v2.PersistentTopics; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; +import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.ws.rs.core.UriInfo; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest { + + private PersistentTopics persistentTopics; + private final String testTenant = "my-tenant"; + private final String testLocalCluster = "use"; + private final String testNamespace = "my-namespace"; + protected Field uriField; + protected UriInfo uriInfo; + + @BeforeClass + public void initPersistentTopics() throws Exception { + uriField = PulsarWebResource.class.getDeclaredField("uri"); + uriField.setAccessible(true); + uriInfo = mock(UriInfo.class); + } + + @Override + @BeforeMethod + protected void setup() throws Exception { + super.internalSetup(); + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("prop", + new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("prop/ns-abc"); + admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test")); + persistentTopics = spy(new PersistentTopics()); + persistentTopics.setServletContext(new MockServletContext()); + persistentTopics.setPulsar(pulsar); + + doReturn(mockZookKeeper).when(persistentTopics).globalZk(); + doReturn(mockZookKeeper).when(persistentTopics).localZk(); + doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache(); + doReturn(pulsar.getConfigurationCache().policiesCache()).when(persistentTopics).policiesCache(); + doReturn(false).when(persistentTopics).isRequestHttps(); + doReturn(null).when(persistentTopics).originalPrincipal(); + doReturn("test").when(persistentTopics).clientAppId(); + doReturn("persistent").when(persistentTopics).domain(); + doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant); + doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData(); + } + + @Override + @AfterMethod + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testGetLastMessageId() throws Exception { + try { + persistentTopics.getLastMessageId(testTenant, testNamespace, "my-topic", true); + } catch (Exception e) { + //System.out.println(e.getMessage()); + Assert.assertEquals("Topic not found", e.getMessage()); + } + + String key = "legendtkl"; + final String topicName = "persistent://prop/ns-abc/my-topic"; + final String messagePredicate = "my-message-" + key + "-"; + final int numberOfMessages = 30; + + // 2. Create Producer + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + // 3. Publish message and get message id + for (int i = 0; i < numberOfMessages; i++) { + String message = messagePredicate + i; + producer.send(message.getBytes()); + } + + MessageId id = persistentTopics.getLastMessageId("prop", "ns-abc", "my-topic", true); + System.out.println(id.toString()); + Assert.assertTrue(((MessageIdImpl)id).getLedgerId() >= 0); + Assert.assertEquals(numberOfMessages-1, ((MessageIdImpl)id).getEntryId()); + + // send more numberOfMessages messages, the last message id should be numberOfMessages*2-1 + for (int i = 0; i < numberOfMessages; i++) { + String message = messagePredicate + i; + producer.send(message.getBytes()); + } + id = persistentTopics.getLastMessageId("prop", "ns-abc", "my-topic", true); + System.out.println(id.toString()); + Assert.assertTrue(((MessageIdImpl)id).getLedgerId() > 0); + Assert.assertEquals( 2 * numberOfMessages -1, ((MessageIdImpl)id).getEntryId()); + + System.out.println(id.toString()); + } +}