This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6f36aee Rest endpoint to query compaction status (#1501) 6f36aee is described below commit 6f36aeea72178014d5aed980f38585726ca588d7 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Fri Apr 6 23:12:17 2018 +0200 Rest endpoint to query compaction status (#1501) Currently returns whether compaction has not run, is running, is complete or has failed. If it has failed, the last error is returned also. --- .../broker/admin/impl/PersistentTopicsBase.java | 7 +++ .../pulsar/broker/admin/v1/PersistentTopics.java | 14 ++++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 15 +++++++ .../broker/service/persistent/PersistentTopic.java | 24 ++++++++++ .../apache/pulsar/broker/admin/AdminApiTest.java | 36 +++++++++++++++ .../pulsar/client/admin/PersistentTopics.java | 8 ++++ .../admin/internal/PersistentTopicsImpl.java | 13 ++++++ .../pulsar/common/compaction/CompactionStatus.java | 52 ++++++++++++++++++++++ 8 files changed, 169 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index aa52c64..03a435d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -74,6 +74,7 @@ import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicDomain; @@ -1086,6 +1087,12 @@ public class PersistentTopicsBase extends AdminResource { } } + protected CompactionStatus internalCompactionStatus(boolean authoritative) { + validateAdminOperationOnTopic(authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + return topic.compactionStatus(); + } + public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index f28d229..b03b533 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -40,6 +40,7 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -435,4 +436,17 @@ public class PersistentTopics extends PersistentTopicsBase { internalTriggerCompaction(authoritative); } + @GET + @Path("/{property}/{cluster}/{namespace}/{topic}/compaction") + @ApiOperation(value = "Get the status of a compaction operation for a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) + public CompactionStatus compactionStatus( + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(property, cluster, namespace, encodedTopic); + return internalCompactionStatus(authoritative); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 3ffdde8..4b88743 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -40,6 +40,7 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -407,4 +408,18 @@ public class PersistentTopics extends PersistentTopicsBase { validateTopicName(property, namespace, encodedTopic); internalTriggerCompaction(authoritative); } + + @GET + @Path("/{property}/{namespace}/{topic}/compaction") + @ApiOperation(value = "Get the status of a compaction operation for a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) + public CompactionStatus compactionStatus( + @PathParam("property") String property, + @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(property, namespace, encodedTopic); + return internalCompactionStatus(authoritative); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6f7e4fb..2d63fb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -84,6 +86,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ConsumerStats; @@ -1622,6 +1625,27 @@ public class PersistentTopic implements Topic, AddEntryCallback { } } + + public synchronized CompactionStatus compactionStatus() { + final CompletableFuture<Long> current; + synchronized (this) { + current = currentCompaction; + } + if (!current.isDone()) { + return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING); + } else { + try { + if (current.join() == COMPACTION_NEVER_RUN) { + return CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN); + } else { + return CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS); + } + } catch (CancellationException | CompletionException e) { + return CompactionStatus.forError(e.getMessage()); + } + } + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 80fbf18..c53db5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -1927,4 +1928,39 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { verify(compactor, times(2)).compact(topicName); } + @Test + public void testCompactionStatus() throws Exception { + String topicName = "persistent://prop-xyz/use/ns1/topic1"; + + // create a topic by creating a producer + pulsarClient.newProducer().topic(topicName).create().close(); + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + + assertEquals(admin.persistentTopics().compactionStatus(topicName).status, + CompactionStatus.Status.NOT_RUN); + + // mock actual compaction, we don't need to really run it + CompletableFuture<Long> promise = new CompletableFuture<Long>(); + Compactor compactor = pulsar.getCompactor(); + doReturn(promise).when(compactor).compact(topicName); + admin.persistentTopics().triggerCompaction(topicName); + + assertEquals(admin.persistentTopics().compactionStatus(topicName).status, + CompactionStatus.Status.RUNNING); + + promise.complete(1L); + + assertEquals(admin.persistentTopics().compactionStatus(topicName).status, + CompactionStatus.Status.SUCCESS); + + CompletableFuture<Long> errorPromise = new CompletableFuture<Long>(); + doReturn(errorPromise).when(compactor).compact(topicName); + admin.persistentTopics().triggerCompaction(topicName); + errorPromise.completeExceptionally(new Exception("Failed at something")); + + assertEquals(admin.persistentTopics().compactionStatus(topicName).status, + CompactionStatus.Status.ERROR); + assertTrue(admin.persistentTopics().compactionStatus(topicName) + .lastError.contains("Failed at something")); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java index 93df290..364fec4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -909,4 +910,11 @@ public interface PersistentTopics { * The topic on which to trigger compaction */ void triggerCompaction(String topic) throws PulsarAdminException; + + /** + * Check the status of an ongoing compaction for a topic. + * + * @param topic The topic whose compaction status we wish to check + */ + CompactionStatus compactionStatus(String topic) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java index 6193d08..3121739 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java @@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; +import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -723,6 +724,18 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi } } + @Override + public CompactionStatus compactionStatus(String topic) + throws PulsarAdminException { + try { + TopicName tn = validateTopic(topic); + return request(topicPath(tn, "compaction")) + .get(CompactionStatus.class); + } catch (Exception e) { + throw getApiException(e); + } + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { final WebTarget base = namespace.isV2() ? adminV2PersistentTopics : adminPersistentTopics; WebTarget namespacePath = base.path(namespace.toString()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java new file mode 100644 index 0000000..9020c21 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.compaction; + +/** + * Status of compaction for a topic. + */ +public class CompactionStatus { + public enum Status { + NOT_RUN, + RUNNING, + SUCCESS, + ERROR + }; + + public Status status; + public String lastError; + + public CompactionStatus() { + this.status = Status.NOT_RUN; + this.lastError = ""; + } + + private CompactionStatus(Status status, String lastError) { + this.status = status; + this.lastError = lastError; + } + + public static CompactionStatus forStatus(Status status) { + return new CompactionStatus(status, ""); + } + + public static CompactionStatus forError(String lastError) { + return new CompactionStatus(Status.ERROR, lastError); + } +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.