This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e936778691 [improve][pip] PIP-360 Add admin API to display Schema
metadata (#22938)
1e936778691 is described below
commit 1e936778691b0cee54b6fe34b53ebc1593f5ae92
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 3 23:11:36 2024 -0700
[improve][pip] PIP-360 Add admin API to display Schema metadata (#22938)
---
.../broker/admin/impl/SchemasResourceBase.java | 9 ++++
.../pulsar/broker/admin/v1/SchemasResource.java | 32 +++++++++++++
.../pulsar/broker/admin/v2/SchemasResource.java | 30 ++++++++++++
.../service/schema/BookkeeperSchemaStorage.java | 20 ++++++++
.../java/org/apache/pulsar/schema/SchemaTest.java | 26 +++++++++++
.../org/apache/pulsar/client/admin/Schemas.java | 16 +++++++
.../common/policies/data/SchemaMetadata.java | 48 +++++++++++++++++++
.../pulsar/client/admin/internal/SchemasImpl.java | 18 ++++++++
.../org/apache/pulsar/admin/cli/CmdSchemas.java | 13 ++++++
.../org/apache/pulsar/admin/cli/TestCmdSchema.java | 54 ++++++++++++++++++++++
10 files changed, 266 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 286366c8b58..886db9c7abb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -105,6 +107,13 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(boolean
authoritative) {
+ String schemaId = getSchemaId();
+ BookkeeperSchemaStorage storage = (BookkeeperSchemaStorage)
pulsar().getSchemaStorage();
+ return validateOwnershipAndOperationAsync(authoritative,
TopicOperation.GET_METADATA)
+ .thenCompose(__ -> storage.getSchemaMetadata(schemaId));
+ }
+
public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean
authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
index edc600707a1..0d6c3814bf8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
@@ -43,6 +43,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -170,6 +171,37 @@ public class SchemasResource extends SchemasResourceBase {
});
}
+ @GET
+ @Path("/{tenant}/{cluster}/{namespace}/{topic}/metadata")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the schema metadata of a topic", response =
SchemaMetadata.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Client is not authorized or
Don't have admin permission"),
+ @ApiResponse(code = 403, message = "Client is not authenticated"),
+ @ApiResponse(code = 404,
+ message = "Tenant or Namespace or Topic doesn't exist; or
Schema is not found for this topic"),
+ @ApiResponse(code = 412, message = "Failed to find the ownership
for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
+ })
+ public void getSchemaMetadata(
+ @PathParam("tenant") String tenant,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @Suspended final AsyncResponse response
+ ) {
+ validateTopicName(tenant, cluster, namespace, topic);
+ getSchemaMetadataAsync(authoritative)
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get schema metadata for topic
{}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{cluster}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index dd8ed58c853..07758436f6c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -166,6 +166,36 @@ public class SchemasResource extends SchemasResourceBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/metadata")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the schema metadata of a topic", response =
GetAllVersionsSchemaResponse.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Client is not authorized or
Don't have admin permission"),
+ @ApiResponse(code = 403, message = "Client is not authenticated"),
+ @ApiResponse(code = 404,
+ message = "Tenant or Namespace or Topic doesn't exist; or
Schema is not found for this topic"),
+ @ApiResponse(code = 412, message = "Failed to find the ownership
for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
+ })
+ public void getSchemaMetadata(
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @Suspended final AsyncResponse response
+ ) {
+ validateTopicName(tenant, namespace, topic);
+ getSchemaMetadataAsync(authoritative)
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get schema metadata for topic
{}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 85c8aa06458..f68cdd6473e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -52,8 +52,11 @@ import
org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry;
+import
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
@@ -554,6 +557,23 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
o.map(r -> new LocatorEntry(r.getValue(),
r.getStat().getVersion())));
}
+ public CompletableFuture<SchemaMetadata> getSchemaMetadata(String schema) {
+ return getLocator(schema).thenApply(locator -> {
+ if (!locator.isPresent()) {
+ return null;
+ }
+ SchemaLocator sl = locator.get().locator;
+ SchemaMetadata metadata = new SchemaMetadata();
+ IndexEntry info = sl.getInfo();
+ metadata.info = new
SchemaMetadata.Entry(info.getPosition().getLedgerId(),
info.getPosition().getEntryId(),
+ info.getVersion());
+ metadata.index = sl.getIndexList() == null ? null
+ : sl.getIndexList().stream().map(i -> new
SchemaMetadata.Entry(i.getPosition().getLedgerId(),
+ i.getPosition().getEntryId(),
i.getVersion())).collect(Collectors.toList());
+ return metadata;
+ });
+ }
+
@NotNull
private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle,
SchemaStorageFormat.SchemaEntry entry) {
final CompletableFuture<Long> future = new CompletableFuture<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index ae9ea6d5ae6..ab82f981b5d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -23,6 +23,7 @@ import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTes
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
@@ -84,6 +85,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -1492,4 +1494,28 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
consumer.close();
producer.close();
}
+
+ @Test
+ public void testTopicSchemaMetadata() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicOne = "metadata-topic";
+ final String topicName = TopicName.get(TopicDomain.persistent.value(),
tenant, namespace, topicOne).toString();
+
+ admin.namespaces().createNamespace(tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
+
+ @Cleanup
+ Producer<Schemas.PersonTwo> producer = pulsarClient
+ .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+ .topic(topicName).create();
+
+ SchemaMetadata metadata = admin.schemas().getSchemaMetadata(topicName);
+
+ assertNotNull(metadata);
+ assertNotNull(metadata.info);
+ assertNotEquals(metadata.info.getLedgerId(), 0);
+ assertEquals(metadata.info.getEntryId(), 0);
+ assertEquals(metadata.index.size(), 1);
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index 9a1eb67d2e5..ca8bed25370 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -233,4 +234,19 @@ public interface Schemas {
* @param topic topic name, in fully qualified format
*/
CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic);
+
+ /**
+ * Get schema metadata of the <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @throws PulsarAdminException
+ */
+ SchemaMetadata getSchemaMetadata(String topic) throws PulsarAdminException;
+
+ /**
+ * Get schema metadata of the <tt>topic</tt> asynchronously.
+ *
+ * @param topic topic name, in fully qualified format
+ */
+ CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String topic);
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
new file mode 100644
index 00000000000..ff6ba6e8649
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Schema metadata info.
+ */
+@Data
+public class SchemaMetadata {
+
+ public Entry info;
+ public List<Entry> index;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class Entry {
+ private long ledgerId;
+ private long entryId;
+ private long version;
+
+ @Override
+ public String toString() {
+ return String.format("ledgerId=[%d], entryId=[%d], version=[%d]",
ledgerId, entryId, version);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 28b435ab567..7f2383e1e52 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -276,6 +277,19 @@ public class SchemasImpl extends BaseResource implements
Schemas {
.collect(Collectors.toList()));
}
+ @Override
+ public SchemaMetadata getSchemaMetadata(String topic) throws
PulsarAdminException {
+ return sync(() -> getSchemaMetadataAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String
topic) {
+ TopicName tn = TopicName.get(topic);
+ WebTarget path = metadata(tn);
+ return asyncGetRequest(path, new FutureCallback<SchemaMetadata>(){});
+ }
+
+
private WebTarget schemaPath(TopicName topicName) {
return topicPath(topicName, "schema");
}
@@ -292,6 +306,10 @@ public class SchemasImpl extends BaseResource implements
Schemas {
return topicPath(topicName, "compatibility");
}
+ private WebTarget metadata(TopicName topicName) {
+ return topicPath(topicName, "metadata");
+ }
+
private WebTarget topicPath(TopicName topic, String... parts) {
final WebTarget base = topic.isV2() ? adminV2 : adminV1;
WebTarget topicPath = base.path(topic.getRestPath(false));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index ab8fdc1f013..9131f11f3d3 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -44,6 +44,7 @@ public class CmdSchemas extends CmdBase {
addCommand("delete", new DeleteSchema());
addCommand("upload", new UploadSchema());
addCommand("extract", new ExtractSchema());
+ addCommand("metadata", new GetSchemaMetadata());
addCommand("compatibility", new TestCompatibility());
}
@@ -77,6 +78,18 @@ public class CmdSchemas extends CmdBase {
}
}
+ @Command(description = "Get the schema for a topic")
+ private class GetSchemaMetadata extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(topicName);
+ print(getAdmin().schemas().getSchemaMetadata(topic));
+ }
+ }
+
@Command(description = "Delete all versions schema of a topic")
private class DeleteSchema extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
new file mode 100644
index 00000000000..b61ac3b8ef3
--- /dev/null
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdSchema {
+
+ private PulsarAdmin pulsarAdmin;
+
+ private CmdSchemas cmdSchemas;
+
+ private Schemas schemas;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ pulsarAdmin = mock(PulsarAdmin.class);
+ schemas = mock(Schemas.class);
+ when(pulsarAdmin.schemas()).thenReturn(schemas);
+ cmdSchemas = spy(new CmdSchemas(() -> pulsarAdmin));
+ }
+
+ @Test
+ public void testCmdClusterConfigFile() throws Exception {
+ String topic = "persistent://tenant/ns1/t1";
+ cmdSchemas.run(new String[]{"metadata", topic});
+ verify(schemas).getSchemaMetadata(eq(topic));
+ }
+}