This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e936778691 [improve][pip] PIP-360 Add admin API to display Schema 
metadata (#22938)
1e936778691 is described below

commit 1e936778691b0cee54b6fe34b53ebc1593f5ae92
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 3 23:11:36 2024 -0700

    [improve][pip] PIP-360 Add admin API to display Schema metadata (#22938)
---
 .../broker/admin/impl/SchemasResourceBase.java     |  9 ++++
 .../pulsar/broker/admin/v1/SchemasResource.java    | 32 +++++++++++++
 .../pulsar/broker/admin/v2/SchemasResource.java    | 30 ++++++++++++
 .../service/schema/BookkeeperSchemaStorage.java    | 20 ++++++++
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 26 +++++++++++
 .../org/apache/pulsar/client/admin/Schemas.java    | 16 +++++++
 .../common/policies/data/SchemaMetadata.java       | 48 +++++++++++++++++++
 .../pulsar/client/admin/internal/SchemasImpl.java  | 18 ++++++++
 .../org/apache/pulsar/admin/cli/CmdSchemas.java    | 13 ++++++
 .../org/apache/pulsar/admin/cli/TestCmdSchema.java | 54 ++++++++++++++++++++++
 10 files changed, 266 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 286366c8b58..886db9c7abb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -105,6 +107,13 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
+    public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(boolean 
authoritative) {
+        String schemaId = getSchemaId();
+        BookkeeperSchemaStorage storage = (BookkeeperSchemaStorage) 
pulsar().getSchemaStorage();
+        return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.GET_METADATA)
+                .thenCompose(__ -> storage.getSchemaMetadata(schemaId));
+    }
+
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean 
authoritative, boolean force) {
         return validateDestinationAndAdminOperationAsync(authoritative)
                 .thenCompose(__ -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
index edc600707a1..0d6c3814bf8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
@@ -43,6 +43,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -170,6 +171,37 @@ public class SchemasResource extends SchemasResourceBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{cluster}/{namespace}/{topic}/metadata")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get the schema metadata of a topic", response = 
SchemaMetadata.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Client is not authorized or 
Don't have admin permission"),
+            @ApiResponse(code = 403, message = "Client is not authenticated"),
+            @ApiResponse(code = 404,
+                    message = "Tenant or Namespace or Topic doesn't exist; or 
Schema is not found for this topic"),
+            @ApiResponse(code = 412, message = "Failed to find the ownership 
for the topic"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+    })
+    public void getSchemaMetadata(
+            @PathParam("tenant") String tenant,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @Suspended final AsyncResponse response
+    ) {
+        validateTopicName(tenant, cluster, namespace, topic);
+        getSchemaMetadataAsync(authoritative)
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get schema metadata for topic 
{}", clientAppId(), topicName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{cluster}/{namespace}/{topic}/schema")
     @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index dd8ed58c853..07758436f6c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -166,6 +166,36 @@ public class SchemasResource extends SchemasResourceBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/metadata")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get the schema metadata of a topic", response = 
GetAllVersionsSchemaResponse.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Client is not authorized or 
Don't have admin permission"),
+            @ApiResponse(code = 403, message = "Client is not authenticated"),
+            @ApiResponse(code = 404,
+                    message = "Tenant or Namespace or Topic doesn't exist; or 
Schema is not found for this topic"),
+            @ApiResponse(code = 412, message = "Failed to find the ownership 
for the topic"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+    })
+    public void getSchemaMetadata(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @Suspended final AsyncResponse response
+    ) {
+        validateTopicName(tenant, namespace, topic);
+        getSchemaMetadataAsync(authoritative)
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get schema metadata for topic 
{}", clientAppId(), topicName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/schema")
     @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 85c8aa06458..f68cdd6473e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -52,8 +52,11 @@ import 
org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry;
+import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
@@ -554,6 +557,23 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                         o.map(r -> new LocatorEntry(r.getValue(), 
r.getStat().getVersion())));
     }
 
+    public CompletableFuture<SchemaMetadata> getSchemaMetadata(String schema) {
+        return getLocator(schema).thenApply(locator -> {
+            if (!locator.isPresent()) {
+                return null;
+            }
+            SchemaLocator sl = locator.get().locator;
+            SchemaMetadata metadata = new SchemaMetadata();
+            IndexEntry info = sl.getInfo();
+            metadata.info = new 
SchemaMetadata.Entry(info.getPosition().getLedgerId(), 
info.getPosition().getEntryId(),
+                    info.getVersion());
+            metadata.index = sl.getIndexList() == null ? null
+                    : sl.getIndexList().stream().map(i -> new 
SchemaMetadata.Entry(i.getPosition().getLedgerId(),
+                            i.getPosition().getEntryId(), 
i.getVersion())).collect(Collectors.toList());
+            return metadata;
+        });
+    }
+
     @NotNull
     private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaStorageFormat.SchemaEntry entry) {
         final CompletableFuture<Long> future = new CompletableFuture<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index ae9ea6d5ae6..ab82f981b5d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTes
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
@@ -84,6 +85,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -1492,4 +1494,28 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testTopicSchemaMetadata() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "metadata-topic";
+        final String topicName = TopicName.get(TopicDomain.persistent.value(), 
tenant, namespace, topicOne).toString();
+
+        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet(CLUSTER_NAME));
+
+        @Cleanup
+        Producer<Schemas.PersonTwo> producer = pulsarClient
+                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                        
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+                .topic(topicName).create();
+
+        SchemaMetadata metadata = admin.schemas().getSchemaMetadata(topicName);
+
+        assertNotNull(metadata);
+        assertNotNull(metadata.info);
+        assertNotEquals(metadata.info.getLedgerId(), 0);
+        assertEquals(metadata.info.getEntryId(), 0);
+        assertEquals(metadata.index.size(), 1);
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index 9a1eb67d2e5..ca8bed25370 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -233,4 +234,19 @@ public interface Schemas {
      * @param topic topic name, in fully qualified format
      */
     CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic);
+
+    /**
+     * Get schema metadata of the <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @throws PulsarAdminException
+     */
+    SchemaMetadata getSchemaMetadata(String topic) throws PulsarAdminException;
+
+    /**
+     * Get schema metadata of the <tt>topic</tt> asynchronously.
+     *
+     * @param topic topic name, in fully qualified format
+     */
+    CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String topic);
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
new file mode 100644
index 00000000000..ff6ba6e8649
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Schema metadata info.
+ */
+@Data
+public class SchemaMetadata {
+
+    public Entry info;
+    public List<Entry> index;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class Entry {
+        private long ledgerId;
+        private long entryId;
+        private long version;
+
+        @Override
+        public String toString() {
+            return String.format("ledgerId=[%d], entryId=[%d], version=[%d]", 
ledgerId, entryId, version);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 28b435ab567..7f2383e1e52 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SchemaMetadata;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -276,6 +277,19 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
                         .collect(Collectors.toList()));
     }
 
+    @Override
+    public SchemaMetadata getSchemaMetadata(String topic) throws 
PulsarAdminException {
+        return sync(() -> getSchemaMetadataAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String 
topic) {
+        TopicName tn = TopicName.get(topic);
+        WebTarget path = metadata(tn);
+        return asyncGetRequest(path, new FutureCallback<SchemaMetadata>(){});
+    }
+
+
     private WebTarget schemaPath(TopicName topicName) {
         return topicPath(topicName, "schema");
     }
@@ -292,6 +306,10 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
         return topicPath(topicName, "compatibility");
     }
 
+    private WebTarget metadata(TopicName topicName) {
+        return topicPath(topicName, "metadata");
+    }
+
     private WebTarget topicPath(TopicName topic, String... parts) {
         final WebTarget base = topic.isV2() ? adminV2 : adminV1;
         WebTarget topicPath = base.path(topic.getRestPath(false));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index ab8fdc1f013..9131f11f3d3 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -44,6 +44,7 @@ public class CmdSchemas extends CmdBase {
         addCommand("delete", new DeleteSchema());
         addCommand("upload", new UploadSchema());
         addCommand("extract", new ExtractSchema());
+        addCommand("metadata", new GetSchemaMetadata());
         addCommand("compatibility", new TestCompatibility());
     }
 
@@ -77,6 +78,18 @@ public class CmdSchemas extends CmdBase {
         }
     }
 
+    @Command(description = "Get the schema for a topic")
+    private class GetSchemaMetadata extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(topicName);
+            print(getAdmin().schemas().getSchemaMetadata(topic));
+        }
+    }
+
     @Command(description = "Delete all versions schema of a topic")
     private class DeleteSchema extends CliCommand {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
new file mode 100644
index 00000000000..b61ac3b8ef3
--- /dev/null
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdSchema {
+
+    private PulsarAdmin pulsarAdmin;
+
+    private CmdSchemas cmdSchemas;
+
+    private Schemas schemas;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        pulsarAdmin = mock(PulsarAdmin.class);
+        schemas = mock(Schemas.class);
+        when(pulsarAdmin.schemas()).thenReturn(schemas);
+        cmdSchemas = spy(new CmdSchemas(() -> pulsarAdmin));
+    }
+
+    @Test
+    public void testCmdClusterConfigFile() throws Exception {
+        String topic = "persistent://tenant/ns1/t1";
+        cmdSchemas.run(new String[]{"metadata", topic});
+        verify(schemas).getSchemaMetadata(eq(topic));
+    }
+}

Reply via email to